You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/11/13 21:50:00 UTC
svn commit: r1541703 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-client/src/test/java/org/apache/hadoop/hbase/client/
hbase-server/src/test/java/org/ap...
Author: stack
Date: Wed Nov 13 20:49:59 2013
New Revision: 1541703
URL: http://svn.apache.org/r1541703
Log:
HBASE-9907 Rig to fake a cluster so can profile client behaviors
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Wed Nov 13 20:49:59 2013
@@ -641,8 +641,7 @@ class AsyncProcess<CResult> {
private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
List<Action<Row>> toReplay, int numAttempt, int failureCount,
Throwable throwable,
- HConnectionManager.ServerErrorTracker errorsByServer){
-
+ HConnectionManager.ServerErrorTracker errorsByServer) {
if (toReplay.isEmpty()) {
// it's either a success or a last failure
if (failureCount != 0) {
@@ -789,22 +788,22 @@ class AsyncProcess<CResult> {
StringBuilder sb = new StringBuilder();
sb.append("#").append(id).append(", table=").append(tableName).
- append(", Attempt #").append(numAttempt).append("/").append(numTries).append(" ");
+ append(", attempt=").append(numAttempt).append("/").append(numTries).append(" ");
if (failureCount > 0 || error != null){
- sb.append("failed ").append(failureCount).append(" ops").append(", last exception was: ").
- append(error == null ? "null" : error.getMessage());
- }else {
+ sb.append("failed ").append(failureCount).append(" ops").append(", last exception: ").
+ append(error == null ? "null" : error);
+ } else {
sb.append("SUCCEEDED");
}
- sb.append(" on server ").append(sn);
+ sb.append(" on ").append(sn);
- sb.append(", tracking started at ").append(startTime);
+ sb.append(", tracking started ").append(startTime);
if (willRetry) {
- sb.append(" - retrying after sleeping for ").append(backOffTime).append(" ms").
- append(", will replay ").append(replaySize).append(" ops.");
+ sb.append(", retrying after ").append(backOffTime).append(" ms").
+ append(", replay ").append(replaySize).append(" ops.");
} else if (failureCount > 0) {
sb.append(" - FAILED, NOT RETRYING ANYMORE");
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed Nov 13 20:49:59 2013
@@ -41,10 +41,12 @@ import org.apache.hadoop.hbase.protobuf.
/**
* A cluster connection. Knows how to find the master, locate regions out on the cluster,
- * keeps a cache of locations and then knows how to re-calibrate after they move.
- * {@link HConnectionManager} manages instances of this class. This is NOT a connection to a
- * particular server but to all servers in the cluster. Individual connections are managed at a
- * lower level.
+ * keeps a cache of locations and then knows how to re-calibrate after they move. You need one
+ * of these to talk to your HBase cluster. {@link HConnectionManager} manages instances of this
+ * class. See it for how to get one of these.
+ *
+ * <p>This is NOT a connection to a particular server but to ALL servers in the cluster. Individual
+ * connections are managed at a lower level.
*
* <p>HConnections are used by {@link HTable} mostly but also by
* {@link HBaseAdmin}, and {@link CatalogTracker}. HConnection instances can be shared. Sharing
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Nov 13 20:49:59 2013
@@ -71,7 +71,10 @@ import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
- * <p>Used to communicate with a single HBase table.
+ * <p>Used to communicate with a single HBase table. An implementation of
+ * {@link HTableInterface}. Instances of this class can be constructed directly but it is
+ * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
+ * See {@link HConnectionManager} class comment for an example.
*
* <p>This class is not thread safe for reads nor write.
*
@@ -336,7 +339,7 @@ public class HTable implements HTableInt
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
- ap = new AsyncProcess<Object>(connection, tableName, pool, null,
+ ap = new AsyncProcess<Object>(connection, tableName, pool, null,
configuration, rpcCallerFactory);
this.maxKeyValueSize = this.configuration.getInt(
@@ -1070,7 +1073,7 @@ public class HTable implements HTableInt
throw new IOException(
"Invalid arguments to incrementColumnValue", npe);
}
-
+
RegionServerCallable<Long> callable =
new RegionServerCallable<Long>(connection, getName(), row) {
public Long call() throws IOException {
@@ -1525,7 +1528,7 @@ public class HTable implements HTableInt
@Override
public String toString() {
- return tableName + ", " + connection;
+ return tableName + ";" + connection;
}
/**
@@ -1541,4 +1544,4 @@ public class HTable implements HTableInt
t.close();
}
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed Nov 13 20:49:59 2013
@@ -37,6 +37,7 @@ import java.util.Map;
/**
* Used to communicate with a single HBase table.
+ * Obtain an instance from an {@ink HConnection}.
*
* @since 0.21.0
*/
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Wed Nov 13 20:49:59 2013
@@ -31,10 +31,11 @@ import org.apache.hadoop.hbase.ipc.Paylo
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ServiceException;
@@ -65,20 +66,29 @@ class MultiServerCallable<R> extends Reg
int countOfActions = this.multiAction.size();
if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
+ RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
+ ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+ MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
List<CellScannable> cells = null;
// The multi object is a list of Actions by region. Iterate by region.
for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey();
final List<Action<R>> actions = e.getValue();
- RegionAction.Builder regionActionBuilder;
+ regionActionBuilder.clear();
+ regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
+ HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
+
+
if (this.cellBlock) {
// Presize. Presume at least a KV per Action. There are likely more.
if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
// Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
// They have already been handled above. Guess at count of cells
- regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells);
+ regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
+ regionActionBuilder, actionBuilder, mutationBuilder);
} else {
- regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions);
+ regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
+ regionActionBuilder, actionBuilder, mutationBuilder);
}
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
}
@@ -118,4 +128,4 @@ class MultiServerCallable<R> extends Reg
// Use the location we were given in the constructor rather than go look it up.
setStub(getConnection().getClient(getLocation().getServerName()));
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Nov 13 20:49:59 2013
@@ -988,8 +988,8 @@ public final class ProtobufUtil {
* @param increment
* @return the converted mutate
*/
- public static MutationProto toMutation(final Increment increment) {
- MutationProto.Builder builder = MutationProto.newBuilder();
+ public static MutationProto toMutation(final Increment increment,
+ final MutationProto.Builder builder) {
builder.setRow(ZeroCopyLiteralByteString.wrap(increment.getRow()));
builder.setMutateType(MutationType.INCREMENT);
builder.setDurability(toDurability(increment.getDurability()));
@@ -1045,12 +1045,18 @@ public final class ProtobufUtil {
*/
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
throws IOException {
- MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
+ return toMutation(type, mutation, MutationProto.newBuilder());
+ }
+
+ public static MutationProto toMutation(final MutationType type, final Mutation mutation,
+ MutationProto.Builder builder)
+ throws IOException {
+ builder = getMutationBuilderAndSetCommonFields(type, mutation, builder);
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
+ columnBuilder.clear();
columnBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family.getKey()));
- columnBuilder.clearQualifierValue();
for (Cell cell: family.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
valueBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(
@@ -1080,9 +1086,10 @@ public final class ProtobufUtil {
* @return a protobuf'd Mutation
* @throws IOException
*/
- public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation)
+ public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation,
+ final MutationProto.Builder builder)
throws IOException {
- MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
+ getMutationBuilderAndSetCommonFields(type, mutation, builder);
builder.setAssociatedCellCount(mutation.size());
return builder.build();
}
@@ -1095,8 +1102,7 @@ public final class ProtobufUtil {
* @return A partly-filled out protobuf'd Mutation.
*/
private static MutationProto.Builder getMutationBuilderAndSetCommonFields(final MutationType type,
- final Mutation mutation) {
- MutationProto.Builder builder = MutationProto.newBuilder();
+ final Mutation mutation, MutationProto.Builder builder) {
builder.setRow(ZeroCopyLiteralByteString.wrap(mutation.getRow()));
builder.setMutateType(type);
builder.setDurability(toDurability(mutation.getDurability()));
@@ -2254,15 +2260,16 @@ public final class ProtobufUtil {
// Doing this is going to kill us if we do it for all data passed.
// St.Ack 20121205
CellProtos.Cell.Builder kvbuilder = CellProtos.Cell.newBuilder();
- kvbuilder.setRow(ByteString.copyFrom(kv.getRowArray(), kv.getRowOffset(),
+ kvbuilder.setRow(ZeroCopyLiteralByteString.wrap(kv.getRowArray(), kv.getRowOffset(),
kv.getRowLength()));
- kvbuilder.setFamily(ByteString.copyFrom(kv.getFamilyArray(),
+ kvbuilder.setFamily(ZeroCopyLiteralByteString.wrap(kv.getFamilyArray(),
kv.getFamilyOffset(), kv.getFamilyLength()));
- kvbuilder.setQualifier(ByteString.copyFrom(kv.getQualifierArray(),
+ kvbuilder.setQualifier(ZeroCopyLiteralByteString.wrap(kv.getQualifierArray(),
kv.getQualifierOffset(), kv.getQualifierLength()));
kvbuilder.setCellType(CellProtos.CellType.valueOf(kv.getTypeByte()));
kvbuilder.setTimestamp(kv.getTimestamp());
- kvbuilder.setValue(ByteString.copyFrom(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
+ kvbuilder.setValue(ZeroCopyLiteralByteString.wrap(kv.getValueArray(), kv.getValueOffset(),
+ kv.getValueLength()));
return kvbuilder.build();
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Wed Nov 13 20:49:59 2013
@@ -218,7 +218,7 @@ public final class RequestConverter {
builder.setRegion(region);
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
- builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder()));
builder.setCondition(condition);
return builder.build();
}
@@ -246,7 +246,8 @@ public final class RequestConverter {
builder.setRegion(region);
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
- builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete,
+ MutationProto.newBuilder()));
builder.setCondition(condition);
return builder.build();
}
@@ -265,7 +266,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder()));
return builder.build();
}
@@ -283,7 +284,8 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append,
+ MutationProto.newBuilder()));
return builder.build();
}
@@ -300,7 +302,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- builder.setMutation(ProtobufUtil.toMutation(increment));
+ builder.setMutation(ProtobufUtil.toMutation(increment, MutationProto.newBuilder()));
return builder.build();
}
@@ -318,7 +320,8 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete,
+ MutationProto.newBuilder()));
return builder.build();
}
@@ -334,7 +337,10 @@ public final class RequestConverter {
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
final RowMutations rowMutations)
throws IOException {
- RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
+ RegionAction.Builder builder =
+ getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
+ ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+ MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
@@ -345,8 +351,11 @@ public final class RequestConverter {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
- MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation);
- builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
+ mutationBuilder.clear();
+ MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
+ actionBuilder.clear();
+ actionBuilder.setMutation(mp);
+ builder.addAction(actionBuilder.build());
}
return builder;
}
@@ -363,9 +372,11 @@ public final class RequestConverter {
* @throws IOException
*/
public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
- final RowMutations rowMutations, final List<CellScannable> cells)
+ final RowMutations rowMutations, final List<CellScannable> cells,
+ final RegionAction.Builder regionActionBuilder,
+ final ClientProtos.Action.Builder actionBuilder,
+ final MutationProto.Builder mutationBuilder)
throws IOException {
- RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type = null;
if (mutation instanceof Put) {
@@ -376,18 +387,20 @@ public final class RequestConverter {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
- MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation);
+ mutationBuilder.clear();
+ MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
cells.add(mutation);
- builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
+ actionBuilder.clear();
+ regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
- return builder;
+ return regionActionBuilder;
}
- private static RegionAction.Builder getRegionActionBuilderWithRegion(final byte [] regionName) {
- RegionAction.Builder builder = RegionAction.newBuilder();
+ private static RegionAction.Builder getRegionActionBuilderWithRegion(
+ final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
- builder.setRegion(region);
- return builder;
+ regionActionBuilder.setRegion(region);
+ return regionActionBuilder;
}
/**
@@ -484,36 +497,37 @@ public final class RequestConverter {
* @throws IOException
*/
public static <R> RegionAction.Builder buildRegionAction(final byte[] regionName,
- final List<Action<R>> actions)
+ final List<Action<R>> actions, final RegionAction.Builder regionActionBuilder,
+ final ClientProtos.Action.Builder actionBuilder,
+ final MutationProto.Builder mutationBuilder)
throws IOException {
- RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
- ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
for (Action<R> action: actions) {
Row row = action.getAction();
actionBuilder.clear();
actionBuilder.setIndex(action.getOriginalIndex());
+ mutationBuilder.clear();
if (row instanceof Get) {
Get g = (Get)row;
- builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
+ regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
} else if (row instanceof Put) {
- builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row)));
+ regionActionBuilder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder)));
} else if (row instanceof Delete) {
- builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row)));
+ regionActionBuilder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder)));
} else if (row instanceof Append) {
- builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row)));
+ regionActionBuilder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row, mutationBuilder)));
} else if (row instanceof Increment) {
- builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutation((Increment)row)));
+ regionActionBuilder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation((Increment)row, mutationBuilder)));
} else if (row instanceof RowMutations) {
throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
- return builder;
+ return regionActionBuilder;
}
/**
@@ -533,14 +547,18 @@ public final class RequestConverter {
* @throws IOException
*/
public static <R> RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
- final List<Action<R>> actions, final List<CellScannable> cells)
+ final List<Action<R>> actions, final List<CellScannable> cells,
+ final RegionAction.Builder regionActionBuilder,
+ final ClientProtos.Action.Builder actionBuilder,
+ final MutationProto.Builder mutationBuilder)
throws IOException {
- RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
- ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+ RegionAction.Builder builder =
+ getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
for (Action<R> action: actions) {
Row row = action.getAction();
actionBuilder.clear();
actionBuilder.setIndex(action.getOriginalIndex());
+ mutationBuilder.clear();
if (row instanceof Get) {
Get g = (Get)row;
builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
@@ -548,7 +566,7 @@ public final class RequestConverter {
Put p = (Put)row;
cells.add(p);
builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p)));
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, mutationBuilder)));
} else if (row instanceof Delete) {
Delete d = (Delete)row;
int size = d.size();
@@ -560,21 +578,21 @@ public final class RequestConverter {
if (size > 0) {
cells.add(d);
builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d)));
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, mutationBuilder)));
} else {
builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d)));
+ setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, mutationBuilder)));
}
} else if (row instanceof Append) {
Append a = (Append)row;
cells.add(a);
builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a)));
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a, mutationBuilder)));
} else if (row instanceof Increment) {
Increment i = (Increment)row;
cells.add(i);
builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i)));
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i, mutationBuilder)));
} else if (row instanceof RowMutations) {
continue; // ignore RowMutations
} else {
Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java (original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java Wed Nov 13 20:49:59 2013
@@ -22,41 +22,79 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
+import com.google.common.base.Stopwatch;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import com.google.protobuf.ZeroCopyLiteralByteString;
/**
* Test client behavior w/o setting up a cluster.
* Mock up cluster emissions.
*/
@Category(SmallTests.class)
-public class TestClientNoCluster {
+public class TestClientNoCluster extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
private Configuration conf;
+ public static final ServerName META_SERVERNAME =
+ new ServerName("meta.example.org", 60010, 12345);
@Before
public void setUp() throws Exception {
@@ -71,7 +109,7 @@ public class TestClientNoCluster {
* Simple cluster registry inserted in place of our usual zookeeper based one.
*/
static class SimpleRegistry implements Registry {
- final ServerName META_HOST = new ServerName("10.10.10.10", 60010, 12345);
+ final ServerName META_HOST = META_SERVERNAME;
@Override
public void init(HConnection connection) {
@@ -301,4 +339,456 @@ public class TestClientNoCluster {
return this.stub;
}
}
+
+ /**
+ * Fake many regionservers and many regions on a connection implementation.
+ */
+ static class ManyServersManyRegionsConnection
+ extends HConnectionManager.HConnectionImplementation {
+ // All access should be synchronized
+ final Map<ServerName, ClientService.BlockingInterface> serversByClient;
+
+ /**
+ * Map of faked-up rows of a 'meta table'.
+ */
+ final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
+ final AtomicLong sequenceids = new AtomicLong(0);
+ private final Configuration conf;
+
+ ManyServersManyRegionsConnection(Configuration conf, boolean managed,
+ ExecutorService pool, User user)
+ throws IOException {
+ super(conf, managed, pool, user);
+ int serverCount = conf.getInt("hbase.test.servers", 10);
+ this.serversByClient =
+ new HashMap<ServerName, ClientService.BlockingInterface>(serverCount);
+ this.meta = makeMeta(Bytes.toBytes(
+ conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
+ conf.getInt("hbase.test.regions", 100),
+ conf.getLong("hbase.test.namespace.span", 1000),
+ serverCount);
+ this.conf = conf;
+ }
+
+ @Override
+ public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
+ // if (!sn.toString().startsWith("meta")) LOG.info(sn);
+ ClientService.BlockingInterface stub = null;
+ synchronized (this.serversByClient) {
+ stub = this.serversByClient.get(sn);
+ if (stub == null) {
+ stub = new FakeServer(this.conf, meta, sequenceids);
+ this.serversByClient.put(sn, stub);
+ }
+ }
+ return stub;
+ }
+ }
+
+ static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
+ final AtomicLong sequenceids, final MultiRequest request) {
+ // Make a response to match the request. Act like there were no failures.
+ ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
+ // Per Region.
+ RegionActionResult.Builder regionActionResultBuilder =
+ RegionActionResult.newBuilder();
+ ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
+ for (RegionAction regionAction: request.getRegionActionList()) {
+ regionActionResultBuilder.clear();
+ // Per Action in a Region.
+ for (ClientProtos.Action action: regionAction.getActionList()) {
+ roeBuilder.clear();
+ // Return empty Result and proper index as result.
+ roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
+ roeBuilder.setIndex(action.getIndex());
+ regionActionResultBuilder.addResultOrException(roeBuilder.build());
+ }
+ builder.addRegionActionResult(regionActionResultBuilder.build());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Fake 'server'.
+ * Implements the ClientService responding as though it were a 'server' (presumes a new
+ * ClientService.BlockingInterface made per server).
+ */
+ static class FakeServer implements ClientService.BlockingInterface {
+ private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
+ private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
+ private final AtomicLong sequenceids;
+ private final long multiPause;
+ private final int tooManyMultiRequests;
+
+ FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
+ final AtomicLong sequenceids) {
+ this.meta = meta;
+ this.sequenceids = sequenceids;
+
+ // Pause to simulate the server taking time applying the edits. This will drive up the
+ // number of threads used over in client.
+ this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
+ this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
+ }
+
+ @Override
+ public GetResponse get(RpcController controller, GetRequest request)
+ throws ServiceException {
+ boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
+ request.getRegion().getType());
+ if (!metaRegion) throw new UnsupportedOperationException();
+ return doMetaGetResponse(meta, request);
+ }
+
+ @Override
+ public MutateResponse mutate(RpcController controller,
+ MutateRequest request) throws ServiceException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ScanResponse scan(RpcController controller,
+ ScanRequest request) throws ServiceException {
+ // Presume it is a scan of meta for now. Not all scans provide a region spec expecting
+ // the server to keep reference by scannerid. TODO.
+ return doMetaScanResponse(meta, sequenceids, request);
+ }
+
+ @Override
+ public BulkLoadHFileResponse bulkLoadHFile(
+ RpcController controller, BulkLoadHFileRequest request)
+ throws ServiceException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CoprocessorServiceResponse execService(
+ RpcController controller, CoprocessorServiceRequest request)
+ throws ServiceException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public MultiResponse multi(RpcController controller, MultiRequest request)
+ throws ServiceException {
+ int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
+ try {
+ if (concurrentInvocations >= tooManyMultiRequests) {
+ throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
+ concurrentInvocations));
+ }
+ Threads.sleep(multiPause);
+ return doMultiResponse(meta, sequenceids, request);
+ } finally {
+ this.multiInvocationsCount.decrementAndGet();
+ }
+ }
+ }
+
+ static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
+ final AtomicLong sequenceids, final ScanRequest request) {
+ ScanResponse.Builder builder = ScanResponse.newBuilder();
+ int max = request.getNumberOfRows();
+ int count = 0;
+ Map<byte [], Pair<HRegionInfo, ServerName>> tail =
+ request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
+ ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
+ for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
+ // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
+ if (max <= 0) break;
+ if (++count > max) break;
+ HRegionInfo hri = e.getValue().getFirst();
+ ByteString row = ZeroCopyLiteralByteString.wrap(hri.getRegionName());
+ resultBuilder.clear();
+ resultBuilder.addCell(getRegionInfo(row, hri));
+ resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
+ resultBuilder.addCell(getStartCode(row));
+ builder.addResults(resultBuilder.build());
+ // Set more to false if we are on the last region in table.
+ if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
+ else builder.setMoreResults(true);
+ }
+ // If no scannerid, set one.
+ builder.setScannerId(request.hasScannerId()?
+ request.getScannerId(): sequenceids.incrementAndGet());
+ return builder.build();
+ }
+
+ static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
+ final GetRequest request) {
+ ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
+ ByteString row = request.getGet().getRow();
+ Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
+ if (p == null) {
+ if (request.getGet().getClosestRowBefore()) {
+ byte [] bytes = row.toByteArray();
+ SortedMap<byte [], Pair<HRegionInfo, ServerName>> head =
+ bytes != null? meta.headMap(bytes): meta;
+ p = head == null? null: head.get(head.lastKey());
+ }
+ }
+ if (p != null) {
+ resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
+ resultBuilder.addCell(getServer(row, p.getSecond()));
+ }
+ resultBuilder.addCell(getStartCode(row));
+ GetResponse.Builder builder = GetResponse.newBuilder();
+ builder.setResult(resultBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * @param name region name or encoded region name.
+ * @param type
+ * @return True if we are dealing with a hbase:meta region.
+ */
+ static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
+ switch (type) {
+ case REGION_NAME:
+ return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
+ case ENCODED_REGION_NAME:
+ return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
+ default: throw new UnsupportedOperationException();
+ }
+ }
+
+ private final static ByteString CATALOG_FAMILY_BYTESTRING =
+ ZeroCopyLiteralByteString.wrap(HConstants.CATALOG_FAMILY);
+ private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
+ ZeroCopyLiteralByteString.wrap(HConstants.REGIONINFO_QUALIFIER);
+ private final static ByteString SERVER_QUALIFIER_BYTESTRING =
+ ZeroCopyLiteralByteString.wrap(HConstants.SERVER_QUALIFIER);
+
+ static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
+ CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
+ cellBuilder.setRow(row);
+ cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
+ cellBuilder.setTimestamp(System.currentTimeMillis());
+ return cellBuilder;
+ }
+
+ static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
+ CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
+ cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
+ cellBuilder.setValue(ZeroCopyLiteralByteString.wrap(hri.toByteArray()));
+ return cellBuilder.build();
+ }
+
+ static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
+ CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
+ cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
+ cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
+ return cellBuilder.build();
+ }
+
+ static CellProtos.Cell getStartCode(final ByteString row) {
+ CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
+ cellBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(HConstants.STARTCODE_QUALIFIER));
+ // TODO:
+ cellBuilder.setValue(ZeroCopyLiteralByteString.wrap(Bytes.toBytes(META_SERVERNAME.getStartcode())));
+ return cellBuilder.build();
+ }
+
+ private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
+
+ /**
+ * Format passed integer. Zero-pad.
+ * Copied from hbase-server PE class and small amendment. Make them share.
+ * @param number
+ * @return Returns zero-prefixed 10-byte wide decimal version of passed
+ * number (Does absolute in case number is negative).
+ */
+ private static byte [] format(final long number) {
+ byte [] b = new byte[10];
+ long d = number;
+ for (int i = b.length - 1; i >= 0; i--) {
+ b[i] = (byte)((d % 10) + '0');
+ d /= 10;
+ }
+ return b;
+ }
+
+ /**
+ * @param count
+ * @param namespaceSpan
+ * @return <code>count</code> regions
+ */
+ private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
+ final long namespaceSpan) {
+ byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
+ byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
+ long interval = namespaceSpan / count;
+ HRegionInfo [] hris = new HRegionInfo[count];
+ for (int i = 0; i < count; i++) {
+ if (i == 0) {
+ endKey = format(interval);
+ } else {
+ startKey = endKey;
+ if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
+ else endKey = format((i + 1) * interval);
+ }
+ hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
+ }
+ return hris;
+ }
+
+ /**
+ * @param count
+ * @return Return <code>count</code> servernames.
+ */
+ private static ServerName [] makeServerNames(final int count) {
+ ServerName [] sns = new ServerName[count];
+ for (int i = 0; i < count; i++) {
+ sns[i] = new ServerName("" + i + ".example.org", 60010, i);
+ }
+ return sns;
+ }
+
+ /**
+ * Comparator for meta row keys.
+ */
+ private static class MetaRowsComparator implements Comparator<byte []> {
+ private final KeyValue.KVComparator delegate = new KeyValue.MetaComparator();
+ @Override
+ public int compare(byte[] left, byte[] right) {
+ return delegate.compareRows(left, 0, left.length, right, 0, right.length);
+ }
+ }
+
+ /**
+ * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
+ * ServerName to return for this row.
+ * @param hris
+ * @param serverNames
+ * @return Map with faked hbase:meta content in it.
+ */
+ static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
+ final int regionCount, final long namespaceSpan, final int serverCount) {
+ // I need a comparator for meta rows so we sort properly.
+ SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
+ new ConcurrentSkipListMap<byte[], Pair<HRegionInfo,ServerName>>(new MetaRowsComparator());
+ HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
+ ServerName [] serverNames = makeServerNames(serverCount);
+ int per = regionCount / serverCount;
+ int count = 0;
+ for (HRegionInfo hri: hris) {
+ Pair<HRegionInfo, ServerName> p =
+ new Pair<HRegionInfo, ServerName>(hri, serverNames[count++ / per]);
+ meta.put(hri.getRegionName(), p);
+ }
+ return meta;
+ }
+
+ /**
+ * Code for each 'client' to run.
+ * @param c
+ * @param sharedConnection
+ * @throws IOException
+ */
+ static void cycle(final Configuration c, final HConnection sharedConnection) throws IOException {
+ HTableInterface table = sharedConnection.getTable(BIG_USER_TABLE);
+ table.setAutoFlushTo(false);
+ long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
+ long startTime = System.currentTimeMillis();
+ final int printInterval = 100000;
+ try {
+ Stopwatch stopWatch = new Stopwatch();
+ stopWatch.start();
+ for (int i = 0; i < namespaceSpan; i++) {
+ byte [] b = format(i);
+ Put p = new Put(b);
+ p.add(HConstants.CATALOG_FAMILY, b, b);
+ if (i % printInterval == 0) {
+ LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
+ stopWatch.reset();
+ stopWatch.start();
+ }
+ table.put(p);
+ }
+ LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
+ (System.currentTimeMillis() - startTime) + "ms");
+ } finally {
+ table.close();
+ }
+ }
+
+ @Override
+ public int run(String[] arg0) throws Exception {
+ int errCode = 0;
+ // TODO: Make command options.
+ // How many servers to fake.
+ final int servers = 1;
+ // How many regions to put on the faked servers.
+ final int regions = 100000;
+ // How many 'keys' in the faked regions.
+ final long namespaceSpan = 1000000;
+ // How long to take to pause after doing a put; make this long if you want to fake a struggling
+ // server.
+ final long multiPause = 0;
+ // Check args make basic sense.
+ if ((namespaceSpan < regions) || (regions < servers)) {
+ throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
+ regions + " which must be > servers=" + servers);
+ }
+
+ // Set my many servers and many regions faking connection in place.
+ getConf().set("hbase.client.connection.impl",
+ ManyServersManyRegionsConnection.class.getName());
+ // Use simple kv registry rather than zk
+ getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
+ // When to report fails. Default is we report the 10th. This means we'll see log everytime
+ // an exception is thrown -- usually RegionTooBusyException when we have more than
+ // hbase.test.multi.too.many requests outstanding at any time.
+ getConf().setInt("hbase.client.start.log.errors.counter", 0);
+
+ // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
+ getConf().setInt("hbase.test.regions", regions);
+ getConf().setLong("hbase.test.namespace.span", namespaceSpan);
+ getConf().setLong("hbase.test.servers", servers);
+ getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
+ getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
+ // Let there be ten outstanding requests at a time before we throw RegionBusyException.
+ getConf().setInt("hbase.test.multi.too.many", 10);
+ final int clients = 20;
+
+ // Have them all share the same connection so they all share the same instance of
+ // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
+ final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
+ // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
+ // Share a connection so I can keep counts in the 'server' on concurrency.
+ final HConnection sharedConnection = HConnectionManager.createConnection(getConf()/*, pool*/);
+ try {
+ Thread [] ts = new Thread[clients];
+ for (int j = 0; j < ts.length; j++) {
+ ts[j] = new Thread("" + j) {
+ final Configuration c = getConf();
+
+ @Override
+ public void run() {
+ try {
+ cycle(c, sharedConnection);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ ts[j].start();
+ }
+ for (int j = 0; j < ts.length; j++) {
+ ts[j].join();
+ }
+ } finally {
+ sharedConnection.close();
+ }
+ return errCode;
+ }
+
+ /**
+ * Run a client instance against a faked up server.
+ * @param args TODO
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
+ }
}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java Wed Nov 13 20:49:59 2013
@@ -59,6 +59,8 @@ import org.apache.hadoop.hbase.ipc.proto
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -318,7 +320,10 @@ public class TestIPC {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction(
- HConstants.EMPTY_BYTE_ARRAY, rm, cells);
+ HConstants.EMPTY_BYTE_ARRAY, rm, cells,
+ RegionAction.newBuilder(),
+ ClientProtos.Action.newBuilder(),
+ MutationProto.newBuilder());
CellScanner cellScanner = CellUtil.createCellScanner(cells);
if (i % 1000 == 0) {
LOG.info("" + i);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java?rev=1541703&r1=1541702&r2=1541703&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java Wed Nov 13 20:49:59 2013
@@ -219,7 +219,8 @@ public class TestProtobufUtil {
mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
Increment increment = ProtobufUtil.toIncrement(proto, null);
- assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(increment));
+ assertEquals(mutateBuilder.build(),
+ ProtobufUtil.toMutation(increment, MutationProto.newBuilder()));
}
/**