You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2017/08/14 16:20:50 UTC
hbase git commit: HBASE-18522 Add RowMutations support to Batch
Repository: hbase
Updated Branches:
refs/heads/branch-1.4 043211760 -> c6f57e0f3
HBASE-18522 Add RowMutations support to Batch
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6f57e0f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6f57e0f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6f57e0f
Branch: refs/heads/branch-1.4
Commit: c6f57e0f382e9dcef48f05da087d12eb0e47e9ad
Parents: 0432117
Author: Jerry He <je...@apache.org>
Authored: Sun Aug 13 18:23:49 2017 -0700
Committer: Jerry He <je...@apache.org>
Committed: Mon Aug 14 09:18:41 2017 -0700
----------------------------------------------------------------------
.../hbase/client/MultiServerCallable.java | 66 +++++++++++++++-----
.../org/apache/hadoop/hbase/client/Table.java | 6 +-
.../hadoop/hbase/protobuf/RequestConverter.java | 6 +-
.../hbase/protobuf/ResponseConverter.java | 35 ++++++++++-
.../hbase/client/TestFromClientSide3.java | 46 ++++++++++++++
.../hadoop/hbase/client/TestMultiParallel.java | 49 ++++++++++++---
6 files changed, 178 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 42c63eb..b2ea941 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -97,12 +98,21 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
- List<CellScannable> cells = null;
- // The multi object is a list of Actions by region. Iterate by region.
+
+ // Pre-size. Presume at least a KV per Action. There are likely more.
+ List<CellScannable> cells =
+ (this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
+
long nonceGroup = multiAction.getNonceGroup();
if (nonceGroup != HConstants.NO_NONCE) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
+ // Index to track RegionAction within the MultiRequest
+ int regionActionIndex = -1;
+ // Map from a created RegionAction for a RowMutations to the original index within
+ // its original list of actions
+ Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
+ // The multi object is a list of Actions by region. Iterate by region.
for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey();
final List<Action<R>> actions = e.getValue();
@@ -110,19 +120,46 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
+ int rowMutations = 0;
+ for (Action<R> action : actions) {
+ Row row = action.getAction();
+ // Row Mutations are a set of Puts and/or Deletes all to be applied atomically
+ // on the one row. We do separate RegionAction for each RowMutations.
+ // We maintain a map to keep track of this RegionAction and the original Action index.
+ if (row instanceof RowMutations) {
+ RowMutations rms = (RowMutations)row;
+ if (this.cellBlock) {
+ // Build a multi request absent its Cell payload. Send data in cellblocks.
+ regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, cells,
+ regionActionBuilder, actionBuilder, mutationBuilder);
+ } else {
+ regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms);
+ }
+ regionActionBuilder.setAtomic(true);
+ multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+ regionActionIndex++;
+ rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex());
+ rowMutations++;
+
+ regionActionBuilder.clear();
+ regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
+ HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
+ }
+ }
- if (this.cellBlock) {
- // Presize. Presume at least a KV per Action. There are likely more.
- if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
- // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
- // They have already been handled above. Guess at count of cells
- regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
- regionActionBuilder, actionBuilder, mutationBuilder);
- } else {
- regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
- regionActionBuilder, actionBuilder, mutationBuilder);
+ if (actions.size() > rowMutations) {
+ if (this.cellBlock) {
+ // Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations.
+ // They have already been handled above. Guess at count of cells
+ regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
+ regionActionBuilder, actionBuilder, mutationBuilder);
+ } else {
+ regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
+ regionActionBuilder, actionBuilder, mutationBuilder);
+ }
+ multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+ regionActionIndex++;
}
- multiRequestBuilder.addRegionAction(regionActionBuilder.build());
}
// Controller optionally carries cell data over the proxy/service boundary and also
@@ -140,7 +177,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
throw ProtobufUtil.getRemoteException(e);
}
if (responseProto == null) return null; // Occurs on cancel
- return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
+ return ResponseConverter.getResults(requestProto, rowMutationsIndexMap,
+ responseProto, controller.cellScanner());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 1a6572b..ac9cfbd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -104,12 +104,12 @@ public interface Table extends Closeable {
boolean[] existsAll(List<Get> gets) throws IOException;
/**
- * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends.
+ * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends, RowMutations
* The ordering of execution of the actions is not defined. Meaning if you do a Put and a
* Get in the same {@link #batch} call, you will not necessarily be
* guaranteed that the Get returns what the Put had put.
*
- * @param actions list of Get, Put, Delete, Increment, Append objects
+ * @param actions list of Get, Put, Delete, Increment, Append, RowMutations
* @param results Empty Object[], same size as actions. Provides access to partial
* results, in case an exception is thrown. A null in the result array means that
* the call for that action failed, even after retries
@@ -123,7 +123,7 @@ public interface Table extends Closeable {
* Same as {@link #batch(List, Object[])}, but returns an array of
* results instead of using a results parameter reference.
*
- * @param actions list of Get, Put, Delete, Increment, Append objects
+ * @param actions list of Get, Put, Delete, Increment, Append, RowMutations
* @return the results from the actions. A null in the return array means that
* the call for that action failed, even after retries
* @throws IOException
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 8163130..32bdcb9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -621,7 +621,8 @@ public final class RequestConverter {
.setMethodName(exec.getMethod().getName())
.setRequest(exec.getRequest().toByteString())));
} else if (row instanceof RowMutations) {
- throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
+ // Skip RowMutations, which has been separately converted to RegionAction
+ continue;
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
@@ -699,7 +700,8 @@ public final class RequestConverter {
.setMethodName(exec.getMethod().getName())
.setRequest(exec.getRequest().toByteString())));
} else if (row instanceof RowMutations) {
- throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
+ // Skip RowMutations, which has been separately converted to RegionAction
+ continue;
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index ba7041e..5b8498d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -79,7 +79,8 @@ public final class ResponseConverter {
/**
* Get the results from a protocol buffer MultiResponse
*
- * @param request the protocol buffer MultiResponse to convert
+ * @param request the original protocol buffer MultiRequest
+ * @param response the protocol buffer MultiResponse to convert
* @param cells Cells to go with the passed in <code>proto</code>. Can be null.
* @return the results that were in the MultiResponse (a Result or an Exception).
* @throws IOException
@@ -87,6 +88,22 @@ public final class ResponseConverter {
public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request,
final MultiResponse response, final CellScanner cells)
throws IOException {
+ return getResults(request, null, response, cells);
+ }
+
+ /**
+ * Get the results from a protocol buffer MultiResponse
+ *
+ * @param request the original protocol buffer MultiRequest
+ * @param rowMutationsIndexMap
+ * @param response the protocol buffer MultiResponse to convert
+ * @param cells Cells to go with the passed in <code>proto</code>. Can be null.
+ * @return the results that were in the MultiResponse (a Result or an Exception).
+ * @throws IOException
+ */
+ public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request,
+ Map<Integer, Integer> rowMutationsIndexMap, final MultiResponse response, final CellScanner cells)
+ throws IOException {
int requestRegionActionCount = request.getRegionActionCount();
int responseRegionActionResultCount = response.getRegionActionResultCount();
if (requestRegionActionCount != responseRegionActionResultCount) {
@@ -120,8 +137,22 @@ public final class ResponseConverter {
actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion());
}
+ Object responseValue;
+
+ Integer rowMutationsIndex =
+ (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i));
+ if (rowMutationsIndex != null) {
+ // This RegionAction is from a RowMutations in a batch.
+ // If there is an exception from the server, the exception is set at
+ // the RegionActionResult level, which has been handled above.
+ responseValue = response.getProcessed() ?
+ ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
+ ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
+ results.add(regionName, rowMutationsIndex, responseValue);
+ continue;
+ }
+
for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
- Object responseValue;
if (roe.hasException()) {
responseValue = ProtobufUtil.toException(roe.getException());
} else if (roe.hasResult()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 08ccc42..5d7c853 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -314,6 +314,52 @@ public class TestFromClientSide3 {
}
@Test
+ public void testBatchWithRowMutation() throws Exception {
+ LOG.info("Starting testBatchWithRowMutation");
+ final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation");
+ try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) {
+ byte [][] QUALIFIERS = new byte [][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b")
+ };
+ RowMutations arm = new RowMutations(ROW);
+ Put p = new Put(ROW);
+ p.addColumn(FAMILY, QUALIFIERS[0], VALUE);
+ arm.add(p);
+ Object[] batchResult = new Object[1];
+ t.batch(Arrays.asList(arm), batchResult);
+
+ Get g = new Get(ROW);
+ Result r = t.get(g);
+ assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
+
+ arm = new RowMutations(ROW);
+ p = new Put(ROW);
+ p.addColumn(FAMILY, QUALIFIERS[1], VALUE);
+ arm.add(p);
+ Delete d = new Delete(ROW);
+ d.addColumns(FAMILY, QUALIFIERS[0]);
+ arm.add(d);
+ t.batch(Arrays.asList(arm), batchResult);
+ r = t.get(g);
+ assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
+ assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
+
+ // Test that we get the correct remote exception for RowMutations from batch()
+ try {
+ arm = new RowMutations(ROW);
+ p = new Put(ROW);
+ p.addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE);
+ arm.add(p);
+ t.batch(Arrays.asList(arm), batchResult);
+ fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
+ } catch (RetriesExhaustedWithDetailsException e) {
+ String msg = e.getMessage();
+ assertTrue(msg.contains("NoSuchColumnFamilyException"));
+ }
+ }
+ }
+
+ @Test
public void testHTableExistsMethodSingleRegionSingleGet() throws Exception {
// Test with a single region table.
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 484bc0e..d18f560 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -576,10 +576,12 @@ public class TestMultiParallel {
@Test(timeout=300000)
public void testBatchWithMixedActions() throws Exception {
LOG.info("test=testBatchWithMixedActions");
- Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
+ Table table = UTIL.getConnection().getTable(TEST_TABLE);
// Load some data to start
- Object[] results = table.batch(constructPutRequests());
+ List<Put> puts = constructPutRequests();
+ Object[] results = new Object[puts.size()];
+ table.batch(puts, results);
validateSizeAndEmpty(results, KEYS.length);
// Batch: get, get, put(new col), delete, get, get of put, get of deleted,
@@ -601,12 +603,12 @@ public class TestMultiParallel {
// 2 put of new column
Put put = new Put(KEYS[10]);
- put.add(BYTES_FAMILY, qual2, val2);
+ put.addColumn(BYTES_FAMILY, qual2, val2);
actions.add(put);
// 3 delete
Delete delete = new Delete(KEYS[20]);
- delete.deleteFamily(BYTES_FAMILY);
+ delete.addFamily(BYTES_FAMILY);
actions.add(delete);
// 4 get
@@ -620,19 +622,38 @@ public class TestMultiParallel {
// 5 put of new column
put = new Put(KEYS[40]);
- put.add(BYTES_FAMILY, qual2, val2);
+ put.addColumn(BYTES_FAMILY, qual2, val2);
actions.add(put);
- results = table.batch(actions);
+ // 6 RowMutations
+ RowMutations rm = new RowMutations(KEYS[50]);
+ put = new Put(KEYS[50]);
+ put.addColumn(BYTES_FAMILY, qual2, val2);
+ rm.add(put);
+ byte[] qual3 = Bytes.toBytes("qual3");
+ byte[] val3 = Bytes.toBytes("putvalue3");
+ put = new Put(KEYS[50]);
+ put.addColumn(BYTES_FAMILY, qual3, val3);
+ rm.add(put);
+ actions.add(rm);
+
+ // 7 Add another Get to the mixed sequence after RowMutations
+ get = new Get(KEYS[10]);
+ get.addColumn(BYTES_FAMILY, QUALIFIER);
+ actions.add(get);
+
+ results = new Object[actions.size()];
+ table.batch(actions, results);
// Validation
validateResult(results[0]);
validateResult(results[1]);
- validateEmpty(results[2]);
validateEmpty(results[3]);
validateResult(results[4]);
validateEmpty(results[5]);
+ validateEmpty(results[6]);
+ validateResult(results[7]);
// validate last put, externally from the batch
get = new Get(KEYS[40]);
@@ -640,6 +661,17 @@ public class TestMultiParallel {
Result r = table.get(get);
validateResult(r, qual2, val2);
+ // validate last RowMutations, externally from the batch
+ get = new Get(KEYS[50]);
+ get.addColumn(BYTES_FAMILY, qual2);
+ r = table.get(get);
+ validateResult(r, qual2, val2);
+
+ get = new Get(KEYS[50]);
+ get.addColumn(BYTES_FAMILY, qual3);
+ r = table.get(get);
+ validateResult(r, qual3, val3);
+
table.close();
}
@@ -716,8 +748,7 @@ public class TestMultiParallel {
private void validateEmpty(Object r1) {
Result result = (Result)r1;
Assert.assertTrue(result != null);
- Assert.assertTrue(result.getRow() == null);
- Assert.assertEquals(0, result.rawCells().length);
+ Assert.assertTrue(result.isEmpty());
}
private void validateSizeAndEmpty(Object[] results, int expectedSize) {