You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2017/11/29 02:48:47 UTC
hbase git commit: HBASE-19096 Add RowMutions batch support in
AsyncTable
Repository: hbase
Updated Branches:
refs/heads/master 93b91e2cc -> e67a3699c
HBASE-19096 Add RowMutions batch support in AsyncTable
Signed-off-by: Jerry He <je...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e67a3699
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e67a3699
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e67a3699
Branch: refs/heads/master
Commit: e67a3699c463a9f222e5d1319d35994fea2a153d
Parents: 93b91e2
Author: Jerry He <je...@apache.org>
Authored: Tue Nov 28 18:41:23 2017 -0800
Committer: Jerry He <je...@apache.org>
Committed: Tue Nov 28 18:42:17 2017 -0800
----------------------------------------------------------------------
.../client/AsyncBatchRpcRetryingCaller.java | 36 +++--
.../apache/hadoop/hbase/client/AsyncTable.java | 12 +-
.../hbase/client/MultiServerCallable.java | 62 +++------
.../hbase/shaded/protobuf/RequestConverter.java | 136 +++++++++++++++----
.../hbase/client/TestAsyncTableBatch.java | 19 ++-
5 files changed, 161 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67a3699/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 2ae68c4..52eb821 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -232,27 +232,19 @@ class AsyncBatchRpcRetryingCaller<T> {
}
private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
- List<CellScannable> cells) throws IOException {
+ List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
- // TODO: remove the extra for loop as we will iterate it in mutationBuilder.
- if (!multiRequestBuilder.hasNonceGroup()) {
- for (Action action : entry.getValue().actions) {
- if (action.hasNonce()) {
- multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup());
- break;
- }
- }
- }
- regionActionBuilder.clear();
- regionActionBuilder.setRegion(
- RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
- regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(),
- entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder);
- multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ // multiRequestBuilder will be populated with region actions.
+ // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
+ // action list.
+ RequestConverter.buildNoDataRegionActions(entry.getKey(),
+ entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
+ mutationBuilder, nonceGroup, rowMutationsIndexMap);
}
return multiRequestBuilder.build();
}
@@ -337,8 +329,12 @@ class AsyncBatchRpcRetryingCaller<T> {
}
ClientProtos.MultiRequest req;
List<CellScannable> cells = new ArrayList<>();
+ // Map from a created RegionAction to the original index for a RowMutations within
+ // the original list of actions. This will be used to process the results when there
+ // is RowMutations in the action list.
+ Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
try {
- req = buildReq(serverReq.actionsByRegion, cells);
+ req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, sn);
return;
@@ -353,8 +349,8 @@ class AsyncBatchRpcRetryingCaller<T> {
onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
} else {
try {
- onComplete(serverReq.actionsByRegion, tries, sn,
- ResponseConverter.getResults(req, resp, controller.cellScanner()));
+ onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,
+ rowMutationsIndexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, sn);
return;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67a3699/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index b3ccb15..fd08aa3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -431,11 +431,11 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
}
/**
- * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of
- * execution of the actions is not defined. Meaning if you do a Put and a Get in the same
- * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put
- * had put.
- * @param actions list of Get, Put, Delete, Increment, Append objects
+ * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The
+ * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the
+ * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the
+ * Put had put.
+ * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects
* @return A list of {@link CompletableFuture}s that represent the result for each action.
*/
<T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
@@ -443,7 +443,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
/**
* A simple version of batch. It will fail if there are any failures and you will get the whole
* result list at once if the operation is succeeded.
- * @param actions list of Get, Put, Delete, Increment, Append objects
+ * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects
* @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
*/
default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67a3699/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index ed7e718..4a0ae39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
@@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@@ -100,57 +98,29 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
(this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
long nonceGroup = multiAction.getNonceGroup();
- if (nonceGroup != HConstants.NO_NONCE) {
- multiRequestBuilder.setNonceGroup(nonceGroup);
- }
- // Index to track RegionAction within the MultiRequest
- int regionActionIndex = -1;
+
// Map from a created RegionAction to the original index for a RowMutations within
- // its original list of actions
+ // the original list of actions. This will be used to process the results when there
+ // is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
// The multi object is a list of Actions by region. Iterate by region.
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey();
final List<Action> actions = e.getValue();
- regionActionBuilder.clear();
- regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
- HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
-
- int rowMutations = 0;
- for (Action action : actions) {
- Row row = action.getAction();
- // Row Mutations are a set of Puts and/or Deletes all to be applied atomically
- // on the one row. We do separate RegionAction for each RowMutations.
- // We maintain a map to keep track of this RegionAction and the original Action index.
- if (row instanceof RowMutations) {
- RowMutations rms = (RowMutations)row;
- if (this.cellBlock) {
- // Build a multi request absent its Cell payload. Send data in cellblocks.
- regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, cells,
- regionActionBuilder, actionBuilder, mutationBuilder);
- } else {
- regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms);
- }
- regionActionBuilder.setAtomic(true);
- multiRequestBuilder.addRegionAction(regionActionBuilder.build());
- regionActionIndex++;
- rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex());
- rowMutations++;
- }
+ if (this.cellBlock) {
+ // Send data in cellblocks.
+ // multiRequestBuilder will be populated with region actions.
+ // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
+ // action list.
+ RequestConverter.buildNoDataRegionActions(regionName, actions, cells, multiRequestBuilder,
+ regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
}
-
- if (actions.size() > rowMutations) {
- if (this.cellBlock) {
- // Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations.
- // They have already been handled above. Guess at count of cells
- regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
- regionActionBuilder, actionBuilder, mutationBuilder);
- } else {
- regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
- regionActionBuilder, actionBuilder, mutationBuilder);
- }
- multiRequestBuilder.addRegionAction(regionActionBuilder.build());
- regionActionIndex++;
+ else {
+ // multiRequestBuilder will be populated with region actions.
+ // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
+ // action list.
+ RequestConverter.buildRegionActions(regionName, actions, multiRequestBuilder,
+ regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67a3699/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 4fdc87d..039a5b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@@ -79,6 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
@@ -623,19 +625,32 @@ public final class RequestConverter {
}
/**
- * Create a protocol buffer multi request for a list of actions.
- * Propagates Actions original index.
- *
- * @param regionName
- * @param actions
- * @return a multi request
+ * Create a protocol buffer multi request for a list of actions. Propagates Actions original
+ * index. The passed in multiRequestBuilder will be populated with region actions.
+ * @param regionName The region name of the actions.
+ * @param actions The actions that are grouped by the same region name.
+ * @param multiRequestBuilder The multiRequestBuilder to be populated with region actions.
+ * @param regionActionBuilder regionActionBuilder to be used to build region action.
+ * @param actionBuilder actionBuilder to be used to build action.
+ * @param mutationBuilder mutationBuilder to be used to build mutation.
+ * @param nonceGroup nonceGroup to be applied.
+ * @param rowMutationsIndexMap Map of created RegionAction to the original index for a
+ * RowMutations within the original list of actions
* @throws IOException
*/
- public static RegionAction.Builder buildRegionAction(final byte[] regionName,
- final List<Action> actions, final RegionAction.Builder regionActionBuilder,
+ public static void buildRegionActions(final byte[] regionName,
+ final List<Action> actions, final MultiRequest.Builder multiRequestBuilder,
+ final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
- final MutationProto.Builder mutationBuilder) throws IOException {
+ final MutationProto.Builder mutationBuilder,
+ long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
+ regionActionBuilder.clear();
+ RegionAction.Builder builder = getRegionActionBuilderWithRegion(
+ regionActionBuilder, regionName);
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
+ boolean hasNonce = false;
+ List<Action> rowMutationsList = new ArrayList<>();
+
for (Action action: actions) {
Row row = action.getAction();
actionBuilder.clear();
@@ -643,19 +658,21 @@ public final class RequestConverter {
mutationBuilder.clear();
if (row instanceof Get) {
Get g = (Get)row;
- regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
+ builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
} else if (row instanceof Put) {
- regionActionBuilder.addAction(actionBuilder.
+ builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder)));
} else if (row instanceof Delete) {
- regionActionBuilder.addAction(actionBuilder.
+ builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder)));
} else if (row instanceof Append) {
- regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
+ builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
MutationType.APPEND, (Append)row, mutationBuilder, action.getNonce())));
+ hasNonce = true;
} else if (row instanceof Increment) {
- regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
+ builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
MutationType.INCREMENT, (Increment)row, mutationBuilder, action.getNonce())));
+ hasNonce = true;
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
@@ -667,19 +684,39 @@ public final class RequestConverter {
} else {
cpBuilder.clear();
}
- regionActionBuilder.addAction(actionBuilder.setServiceCall(
+ builder.addAction(actionBuilder.setServiceCall(
cpBuilder.setRow(UnsafeByteOperations.unsafeWrap(exec.getRow()))
.setServiceName(exec.getMethod().getService().getFullName())
.setMethodName(exec.getMethod().getName())
.setRequest(value)));
} else if (row instanceof RowMutations) {
- // Skip RowMutations, which has been separately converted to RegionAction
- continue;
+ rowMutationsList.add(action);
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
- return regionActionBuilder;
+ if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
+ multiRequestBuilder.setNonceGroup(nonceGroup);
+ }
+ multiRequestBuilder.addRegionAction(builder.build());
+
+ // Process RowMutations here. We can not process it in the big loop above because
+ // it will corrupt the sequence order maintained in cells.
+ // RowMutations is a set of Puts and/or Deletes all to be applied atomically
+ // on the one row. We do separate RegionAction for each RowMutations.
+ // We maintain a map to keep track of this RegionAction and the original Action index.
+ for (Action action : rowMutationsList) {
+ RowMutations rms = (RowMutations) action.getAction();
+ RegionAction.Builder rowMutationsRegionActionBuilder =
+ RequestConverter.buildRegionAction(regionName, rms);
+ rowMutationsRegionActionBuilder.setAtomic(true);
+ // Put it in the multiRequestBuilder
+ multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
+ // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
+ // in the overall multiRequest.
+ rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
+ action.getOriginalIndex());
+ }
}
/**
@@ -689,23 +726,35 @@ public final class RequestConverter {
* coming along otherwise. Note that Get is different. It does not contain 'data' and is always
* carried by protobuf. We return references to the data by adding them to the passed in
* <code>data</code> param.
- *
- * <p>Propagates Actions original index.
- *
- * @param regionName
- * @param actions
+ * <p> Propagates Actions original index.
+ * <p> The passed in multiRequestBuilder will be populated with region actions.
+ * @param regionName The region name of the actions.
+ * @param actions The actions that are grouped by the same region name.
* @param cells Place to stuff references to actual data.
- * @return a multi request that does not carry any data.
+ * @param multiRequestBuilder The multiRequestBuilder to be populated with region actions.
+ * @param regionActionBuilder regionActionBuilder to be used to build region action.
+ * @param actionBuilder actionBuilder to be used to build action.
+ * @param mutationBuilder mutationBuilder to be used to build mutation.
+ * @param nonceGroup nonceGroup to be applied.
+ * @param rowMutationsIndexMap Map of created RegionAction to the original index for a
+ * RowMutations within the original list of actions
* @throws IOException
*/
- public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
+ public static void buildNoDataRegionActions(final byte[] regionName,
final Iterable<Action> actions, final List<CellScannable> cells,
+ final MultiRequest.Builder multiRequestBuilder,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
- final MutationProto.Builder mutationBuilder) throws IOException {
+ final MutationProto.Builder mutationBuilder,
+ long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
+ regionActionBuilder.clear();
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
regionActionBuilder, regionName);
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
+ RegionAction.Builder rowMutationsRegionActionBuilder = null;
+ boolean hasNonce = false;
+ List<Action> rowMutationsList = new ArrayList<>();
+
for (Action action: actions) {
Row row = action.getAction();
actionBuilder.clear();
@@ -740,11 +789,13 @@ public final class RequestConverter {
cells.add(a);
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.APPEND, a, mutationBuilder, action.getNonce())));
+ hasNonce = true;
} else if (row instanceof Increment) {
Increment i = (Increment)row;
cells.add(i);
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
+ hasNonce = true;
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
@@ -762,13 +813,40 @@ public final class RequestConverter {
.setMethodName(exec.getMethod().getName())
.setRequest(value)));
} else if (row instanceof RowMutations) {
- // Skip RowMutations, which has been separately converted to RegionAction
- continue;
+ rowMutationsList.add(action);
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
- return builder;
+ if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
+ multiRequestBuilder.setNonceGroup(nonceGroup);
+ }
+ multiRequestBuilder.addRegionAction(builder.build());
+
+ // Process RowMutations here. We can not process it in the big loop above because
+ // it will corrupt the sequence order maintained in cells.
+ // RowMutations is a set of Puts and/or Deletes all to be applied atomically
+ // on the one row. We do separate RegionAction for each RowMutations.
+ // We maintain a map to keep track of this RegionAction and the original Action index.
+ for (Action action : rowMutationsList) {
+ RowMutations rms = (RowMutations) action.getAction();
+ if (rowMutationsRegionActionBuilder == null) {
+ rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder();
+ } else {
+ rowMutationsRegionActionBuilder.clear();
+ }
+ rowMutationsRegionActionBuilder.setRegion(
+ RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
+ rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms,
+ cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder);
+ rowMutationsRegionActionBuilder.setAtomic(true);
+ // Put it in the multiRequestBuilder
+ multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
+ // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
+ // in the overall multiRequest.
+ rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
+ action.getOriginalIndex());
+ }
}
// End utilities for Client
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67a3699/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index c80b27b..489ad1d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -71,6 +71,7 @@ public class TestAsyncTableBatch {
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
+ private static byte[] CQ1 = Bytes.toBytes("cq1");
private static int COUNT = 1000;
@@ -178,9 +179,9 @@ public class TestAsyncTableBatch {
}
@Test
- public void testMixed() throws InterruptedException, ExecutionException {
+ public void testMixed() throws InterruptedException, ExecutionException, IOException {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
- table.putAll(IntStream.range(0, 5)
+ table.putAll(IntStream.range(0, 7)
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
.collect(Collectors.toList())).get();
List<Row> actions = new ArrayList<>();
@@ -189,8 +190,14 @@ public class TestAsyncTableBatch {
actions.add(new Delete(Bytes.toBytes(2)));
actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
+ RowMutations rm = new RowMutations(Bytes.toBytes(5));
+ rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 100)));
+ rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes((long) 200)));
+ actions.add(rm);
+ actions.add(new Get(Bytes.toBytes(6)));
+
List<Object> results = table.batchAll(actions).get();
- assertEquals(5, results.size());
+ assertEquals(7, results.size());
Result getResult = (Result) results.get(0);
assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
@@ -202,6 +209,12 @@ public class TestAsyncTableBatch {
assertEquals(12, appendValue.length);
assertEquals(4, Bytes.toLong(appendValue));
assertEquals(4, Bytes.toInt(appendValue, 8));
+ assertEquals(100,
+ Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
+ assertEquals(200,
+ Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
+ getResult = (Result) results.get(6);
+ assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
}
public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {