You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/10/05 02:17:12 UTC
svn commit: r1529355 [1/3] - in /hbase/branches/0.96:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-client/src/test/...
Author: stack
Date: Sat Oct 5 00:17:11 2013
New Revision: 1529355
URL: http://svn.apache.org/r1529355
Log:
HBASE-9612 Ability to batch edits destined to different regions
Modified:
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
hbase/branches/0.96/hbase-protocol/README.txt
hbase/branches/0.96/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
hbase/branches/0.96/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
hbase/branches/0.96/hbase-protocol/src/main/protobuf/Client.proto
hbase/branches/0.96/hbase-protocol/src/main/protobuf/RPC.proto
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestResourceFilter.java
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Sat Oct 5 00:17:11 2013
@@ -255,12 +255,14 @@ class AsyncProcess<CResult> {
* @param atLeastOne true if we should submit at least a subset.
*/
public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
- if (rows.isEmpty()){
+ if (rows.isEmpty()) {
return;
}
+ // This looks like we are keying by region but HRegionLocation has a comparator that compares
+ // on the server portion only (hostname + port) so this Map collects regions by server.
Map<HRegionLocation, MultiAction<Row>> actionsByServer =
- new HashMap<HRegionLocation, MultiAction<Row>>();
+ new HashMap<HRegionLocation, MultiAction<Row>>();
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
do {
@@ -321,10 +323,7 @@ class AsyncProcess<CResult> {
* @return the destination. Null if we couldn't find it.
*/
private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
- if (row == null){
- throw new IllegalArgumentException("row cannot be null");
- }
-
+ if (row == null) throw new IllegalArgumentException("row cannot be null");
HRegionLocation loc = null;
IOException locationException = null;
try {
@@ -476,29 +475,29 @@ class AsyncProcess<CResult> {
final int numAttempt,
final HConnectionManager.ServerErrorTracker errorsByServer) {
// Send the queries and add them to the inProgress list
+ // This iteration is by server (the HRegionLocation comparator is by server portion only).
for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
final HRegionLocation loc = e.getKey();
- final MultiAction<Row> multi = e.getValue();
- incTaskCounters(multi.getRegions(), loc.getServerName());
-
+ final MultiAction<Row> multiAction = e.getValue();
+ incTaskCounters(multiAction.getRegions(), loc.getServerName());
Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
@Override
public void run() {
MultiResponse res;
try {
- MultiServerCallable<Row> callable = createCallable(loc, multi);
+ MultiServerCallable<Row> callable = createCallable(loc, multiAction);
try {
res = createCaller(callable).callWithoutRetries(callable);
} catch (IOException e) {
- LOG.warn("The call to the region server failed, we don't know where we stand, " +
- loc.getServerName(), e);
- resubmitAll(initialActions, multi, loc, numAttempt + 1, e, errorsByServer);
+ LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt +
+ ", resubmitting all since not sure where we are at", e);
+ resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
return;
}
- receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
+ receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
} finally {
- decTaskCounters(multi.getRegions(), loc.getServerName());
+ decTaskCounters(multiAction.getRegions(), loc.getServerName());
}
}
});
@@ -508,12 +507,12 @@ class AsyncProcess<CResult> {
} catch (RejectedExecutionException ree) {
// This should never happen. But as the pool is provided by the end user, let's secure
// this a little.
- decTaskCounters(multi.getRegions(), loc.getServerName());
+ decTaskCounters(multiAction.getRegions(), loc.getServerName());
LOG.warn("The task was rejected by the pool. This is unexpected." +
" Server is " + loc.getServerName(), ree);
// We're likely to fail again, but this will increment the attempt counter, so it will
// finish.
- resubmitAll(initialActions, multi, loc, numAttempt + 1, ree, errorsByServer);
+ resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);
}
}
}
@@ -590,12 +589,11 @@ class AsyncProcess<CResult> {
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
hConnection.updateCachedLocations(tableName,
- rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
+ rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
errorsByServer.reportServerError(location);
-
- List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
- for (List<Action<Row>> actions : rsActions.actions.values()) {
- for (Action<Row> action : actions) {
+ List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
+ for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
+ for (Action<Row> action : e.getValue()) {
if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),
true, t, location)) {
toReplay.add(action);
@@ -605,7 +603,7 @@ class AsyncProcess<CResult> {
if (toReplay.isEmpty()) {
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
- initialActions.size() + "ops, NOT resubmitting, " + location.getServerName());
+ initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
} else {
submit(initialActions, toReplay, numAttempt, errorsByServer);
}
@@ -669,11 +667,11 @@ class AsyncProcess<CResult> {
}
} else { // success
if (callback != null) {
- Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
+ int index = regionResult.getFirst();
+ Action<Row> correspondingAction = initialActions.get(index);
Row row = correspondingAction.getAction();
//noinspection unchecked
- this.callback.success(correspondingAction.getOriginalIndex(),
- resultsForRS.getKey(), row, (CResult) result);
+ this.callback.success(index, resultsForRS.getKey(), row, (CResult) result);
}
}
}
@@ -694,8 +692,7 @@ class AsyncProcess<CResult> {
try {
Thread.sleep(backOffTime);
} catch (InterruptedException e) {
- LOG.warn("Not sent: " + toReplay.size() +
- " operations, " + location, e);
+ LOG.warn("Not sent: " + toReplay.size() + " operations, " + location, e);
Thread.interrupted();
return;
}
@@ -705,10 +702,11 @@ class AsyncProcess<CResult> {
if (failureCount != 0) {
// We have a failure but nothing to retry. We're done, it's a final failure..
LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount +
- " ops on " + location.getServerName() + " NOT resubmitting." + location);
+ " ops on " + location.getServerName() + " NOT resubmitting. " + location);
} else if (numAttempt > START_LOG_ERRORS_CNT + 1 && LOG.isDebugEnabled()) {
// The operation was successful, but needed several attempts. Let's log this.
- LOG.debug("Attempt #" + numAttempt + "/" + numTries + " is finally successful.");
+ LOG.debug("Attempt #" + numAttempt + "/" + numTries + " finally suceeded, size=" +
+ toReplay.size());
}
}
}
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java Sat Oct 5 00:17:11 2013
@@ -171,6 +171,7 @@ public class ClientSmallScanner extends
ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try {
+ controller.setPriority(getTableName());
response = getStub().scan(controller, request);
return ResponseConverter.getResults(controller.cellScanner(),
response);
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java Sat Oct 5 00:17:11 2013
@@ -32,7 +32,6 @@ import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
@@ -72,6 +71,8 @@ public class Get extends OperationWithAt
private int storeOffset = 0;
private Filter filter = null;
private TimeRange tr = new TimeRange();
+ private boolean checkExistenceOnly = false;
+ private boolean closestRowBefore = false;
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
@@ -87,6 +88,22 @@ public class Get extends OperationWithAt
this.row = row;
}
+ public boolean isCheckExistenceOnly() {
+ return checkExistenceOnly;
+ }
+
+ public void setCheckExistenceOnly(boolean checkExistenceOnly) {
+ this.checkExistenceOnly = checkExistenceOnly;
+ }
+
+ public boolean isClosestRowBefore() {
+ return closestRowBefore;
+ }
+
+ public void setClosestRowBefore(boolean closestRowBefore) {
+ this.closestRowBefore = closestRowBefore;
+ }
+
/**
* Get all columns from the specified family.
* <p>
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Sat Oct 5 00:17:11 2013
@@ -643,6 +643,7 @@ public class HBaseAdmin implements Abort
.getServerName());
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try {
+ controller.setPriority(tableName);
ScanResponse response = server.scan(controller, request);
values = ResponseConverter.getResults(controller.cellScanner(), response);
} catch (ServiceException se) {
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Oct 5 00:17:11 2013
@@ -2164,8 +2164,7 @@ public class HConnectionManager {
boolean isStaleDelete = false;
HRegionLocation oldLocation;
synchronized (this.cachedRegionLocations) {
- Map<byte[], HRegionLocation> tableLocations =
- getTableLocations(hri.getTable());
+ Map<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
oldLocation = tableLocations.get(hri.getStartKey());
if (oldLocation != null) {
// Do not delete the cache entry if it's not for the same server that gave us the error.
@@ -2362,6 +2361,7 @@ public class HConnectionManager {
}
}
+
/*
* Return the number of cached region for a table. It will only be called
* from a unit test.
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Sat Oct 5 00:17:11 2013
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -61,11 +60,10 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -85,9 +83,6 @@ import com.google.protobuf.ServiceExcept
* <p>In case of reads, some fields used by a Scan are shared among all threads.
* The HTable implementation can either not contract to be safe in case of a Get
*
- * <p>To access a table in a multi threaded environment, please consider
- * using the {@link HTablePool} class to create your HTable instances.
- *
* <p>Instances of HTable passed the same {@link Configuration} instance will
* share connections to servers out on the cluster and to the zookeeper ensemble
* as well as caches of region locations. This is usually a *good* thing and it
@@ -959,8 +954,13 @@ public class HTable implements HTableInt
new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
public Void call() throws IOException {
try {
- MultiRequest request = RequestConverter.buildMultiRequest(
+ RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
getLocation().getRegionInfo().getRegionName(), rm);
+ regionMutationBuilder.setAtomic(true);
+ MultiRequest request =
+ MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+ PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController();
+ pcrc.setPriority(tableName);
getStub().multi(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
@@ -987,6 +987,7 @@ public class HTable implements HTableInt
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+ rpcController.setPriority(getTableName());
MutateResponse response = getStub().mutate(rpcController, request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
@@ -1013,9 +1014,10 @@ public class HTable implements HTableInt
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment);
- PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
- MutateResponse response = getStub().mutate(rpcContoller, request);
- return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
+ PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+ rpcController.setPriority(getTableName());
+ MutateResponse response = getStub().mutate(rpcController, request);
+ return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -1074,6 +1076,7 @@ public class HTable implements HTableInt
getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+ rpcController.setPriority(getTableName());
MutateResponse response = getStub().mutate(rpcController, request);
Result result =
ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
@@ -1142,61 +1145,10 @@ public class HTable implements HTableInt
*/
@Override
public boolean exists(final Get get) throws IOException {
- RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), get.getRow()) {
- public Boolean call() throws IOException {
- try {
- GetRequest request = RequestConverter.buildGetRequest(
- getLocation().getRegionInfo().getRegionName(), get, true);
- GetResponse response = getStub().get(null, request);
- return response.getExists();
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
- }
-
- /**
- * Goal of this inner class is to keep track of the initial position of a get in a list before
- * sorting it. This is used to send back results in the same orders we got the Gets before we sort
- * them.
- */
- private static class SortedGet implements Comparable<SortedGet> {
- protected int initialIndex = -1; // Used to store the get initial index in a list.
- protected Get get; // Encapsulated Get instance.
-
- public SortedGet (Get get, int initialIndex) {
- this.get = get;
- this.initialIndex = initialIndex;
- }
-
- public int getInitialIndex() {
- return initialIndex;
- }
-
- @Override
- public int compareTo(SortedGet o) {
- return get.compareTo(o.get);
- }
-
- public Get getGet() {
- return get;
- }
-
- @Override
- public int hashCode() {
- return get.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof SortedGet)
- return get.equals(((SortedGet)obj).get);
- else
- return false;
- }
+ get.setCheckExistenceOnly(true);
+ Result r = get(get);
+ assert r.getExists() != null;
+ return r.getExists();
}
/**
@@ -1204,100 +1156,26 @@ public class HTable implements HTableInt
*/
@Override
public Boolean[] exists(final List<Get> gets) throws IOException {
- // Prepare the sorted list of gets. Take the list of gets received, and encapsulate them into
- // a list of SortedGet instances. Simple list parsing, so complexity here is O(n)
- // The list is later used to recreate the response order based on the order the Gets
- // got received.
- ArrayList<SortedGet> sortedGetsList = new ArrayList<HTable.SortedGet>();
- for (int indexGet = 0; indexGet < gets.size(); indexGet++) {
- sortedGetsList.add(new SortedGet (gets.get(indexGet), indexGet));
- }
-
- // Sorting the list to get the Gets ordered based on the key.
- Collections.sort(sortedGetsList); // O(n log n)
-
- // step 1: sort the requests by regions to send them bundled.
- // Map key is startKey index. Map value is the list of Gets related to the region starting
- // with the startKey.
- Map<Integer, List<Get>> getsByRegion = new HashMap<Integer, List<Get>>();
-
- // Reference map to quickly find back in which region a get belongs.
- Map<Get, Integer> getToRegionIndexMap = new HashMap<Get, Integer>();
- Pair<byte[][], byte[][]> startEndKeys = getStartEndKeys();
-
- int regionIndex = 0;
- for (final SortedGet get : sortedGetsList) {
- // Progress on the regions until we find the one the current get resides in.
- while ((regionIndex < startEndKeys.getSecond().length) && ((Bytes.compareTo(startEndKeys.getSecond()[regionIndex], get.getGet().getRow()) <= 0))) {
- regionIndex++;
- }
- List<Get> regionGets = getsByRegion.get(regionIndex);
- if (regionGets == null) {
- regionGets = new ArrayList<Get>();
- getsByRegion.put(regionIndex, regionGets);
- }
- regionGets.add(get.getGet());
- getToRegionIndexMap.put(get.getGet(), regionIndex);
- }
+ if (gets.isEmpty()) return new Boolean[]{};
+ if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
- // step 2: make the requests
- Map<Integer, Future<List<Boolean>>> futures =
- new HashMap<Integer, Future<List<Boolean>>>(sortedGetsList.size());
- for (final Map.Entry<Integer, List<Get>> getsByRegionEntry : getsByRegion.entrySet()) {
- Callable<List<Boolean>> callable = new Callable<List<Boolean>>() {
- public List<Boolean> call() throws Exception {
- RegionServerCallable<List<Boolean>> callable =
- new RegionServerCallable<List<Boolean>>(connection, getName(),
- getsByRegionEntry.getValue().get(0).getRow()) {
- public List<Boolean> call() throws IOException {
- try {
- MultiGetRequest requests = RequestConverter.buildMultiGetRequest(
- getLocation().getRegionInfo().getRegionName(), getsByRegionEntry.getValue(),
- true, false);
- MultiGetResponse responses = getStub().multiGet(null, requests);
- return responses.getExistsList();
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.<List<Boolean>> newCaller().callWithRetries(callable,
- operationTimeout);
- }
- };
- futures.put(getsByRegionEntry.getKey(), pool.submit(callable));
+ for (Get g: gets){
+ g.setCheckExistenceOnly(true);
}
- // step 3: collect the failures and successes
- Map<Integer, List<Boolean>> responses = new HashMap<Integer, List<Boolean>>();
- for (final Map.Entry<Integer, List<Get>> sortedGetEntry : getsByRegion.entrySet()) {
- try {
- Future<List<Boolean>> future = futures.get(sortedGetEntry.getKey());
- List<Boolean> resp = future.get();
-
- if (resp == null) {
- LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
- }
- responses.put(sortedGetEntry.getKey(), resp);
- } catch (ExecutionException e) {
- LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
- } catch (InterruptedException e) {
- LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
- Thread.currentThread().interrupt();
- }
+ Object[] r1;
+ try {
+ r1 = batch(gets);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
}
- Boolean[] results = new Boolean[sortedGetsList.size()];
- // step 4: build the response.
- Map<Integer, Integer> indexes = new HashMap<Integer, Integer>();
- for (int i = 0; i < sortedGetsList.size(); i++) {
- Integer regionInfoIndex = getToRegionIndexMap.get(sortedGetsList.get(i).getGet());
- Integer index = indexes.get(regionInfoIndex);
- if (index == null) {
- index = 0;
- }
- results[sortedGetsList.get(i).getInitialIndex()] = responses.get(regionInfoIndex).get(index);
- indexes.put(regionInfoIndex, index + 1);
+ // translate.
+ Boolean[] results = new Boolean[r1.length];
+ int i = 0;
+ for (Object o : r1) {
+ // batch ensures if there is a failure we get an exception instead
+ results[i++] = ((Result)o).getExists();
}
return results;
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Sat Oct 5 00:17:11 2013
@@ -24,14 +24,17 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ServiceException;
@@ -42,91 +45,74 @@ import com.google.protobuf.ServiceExcept
* @param <R>
*/
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
- private final MultiAction<R> multi;
+ private final MultiAction<R> multiAction;
private final boolean cellBlock;
MultiServerCallable(final HConnection connection, final TableName tableName,
final HRegionLocation location, final MultiAction<R> multi) {
super(connection, tableName, null);
- this.multi = multi;
+ this.multiAction = multi;
setLocation(location);
this.cellBlock = isCellBlock();
}
MultiAction<R> getMulti() {
- return this.multi;
+ return this.multiAction;
}
@Override
public MultiResponse call() throws IOException {
- MultiResponse response = new MultiResponse();
- // The multi object is a list of Actions by region.
- for (Map.Entry<byte[], List<Action<R>>> e: this.multi.actions.entrySet()) {
- byte[] regionName = e.getKey();
- int rowMutations = 0;
- List<Action<R>> actions = e.getValue();
- for (Action<R> action : actions) {
- Row row = action.getAction();
- // Row Mutations are a set of Puts and/or Deletes all to be applied atomically
- // on the one row. We do these a row at a time.
- if (row instanceof RowMutations) {
- RowMutations rms = (RowMutations)row;
- List<CellScannable> cells = null;
- MultiRequest multiRequest;
- try {
- if (this.cellBlock) {
- // Stick all Cells for all RowMutations in here into 'cells'. Populated when we call
- // buildNoDataMultiRequest in the below.
- cells = new ArrayList<CellScannable>(rms.getMutations().size());
- // Build a multi request absent its Cell payload (this is the 'nodata' in the below).
- multiRequest = RequestConverter.buildNoDataMultiRequest(regionName, rms, cells);
- } else {
- multiRequest = RequestConverter.buildMultiRequest(regionName, rms);
- }
- // Carry the cells if any over the proxy/pb Service interface using the payload
- // carrying rpc controller.
- getStub().multi(new PayloadCarryingRpcController(cells), multiRequest);
- // This multi call does not return results.
- response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT);
- } catch (ServiceException se) {
- response.add(regionName, action.getOriginalIndex(),
- ProtobufUtil.getRemoteException(se));
- }
- rowMutations++;
- }
- }
- // Are there any non-RowMutation actions to send for this region?
- if (actions.size() > rowMutations) {
- Exception ex = null;
- List<Object> results = null;
- List<CellScannable> cells = null;
- MultiRequest multiRequest;
- try {
- if (isCellBlock()) {
- // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
- // They have already been handled above.
- cells = new ArrayList<CellScannable>(actions.size() - rowMutations);
- multiRequest = RequestConverter.buildNoDataMultiRequest(regionName, actions, cells);
- } else {
- multiRequest = RequestConverter.buildMultiRequest(regionName, actions);
- }
- // Controller optionally carries cell data over the proxy/service boundary and also
- // optionally ferries cell response data back out again.
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
- ClientProtos.MultiResponse responseProto = getStub().multi(controller, multiRequest);
- results = ResponseConverter.getResults(responseProto, controller.cellScanner());
- } catch (ServiceException se) {
- ex = ProtobufUtil.getRemoteException(se);
- }
- for (int i = 0, n = actions.size(); i < n; i++) {
- int originalIndex = actions.get(i).getOriginalIndex();
- response.add(regionName, originalIndex, results == null ? ex : results.get(i));
- }
+ int countOfActions = this.multiAction.size();
+ if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
+ MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
+ List<CellScannable> cells = null;
+ // The multi object is a list of Actions by region. Iterate by region.
+ for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
+ final byte [] regionName = e.getKey();
+ final List<Action<R>> actions = e.getValue();
+ RegionAction.Builder regionActionBuilder;
+ if (this.cellBlock) {
+ // Presize. Presume at least a KV per Action. There are likely more.
+ if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
+ // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
+ // They have already been handled above. Guess at count of cells
+ regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells);
+ } else {
+ regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions);
}
+ multiRequestBuilder.addRegionAction(regionActionBuilder.build());
}
- return response;
+ // Controller optionally carries cell data over the proxy/service boundary and also
+ // optionally ferries cell response data back out again.
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
+ controller.setPriority(getTableName());
+ ClientProtos.MultiResponse responseProto;
+ ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
+ try {
+ responseProto = getStub().multi(controller, requestProto);
+ } catch (ServiceException e) {
+ return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e));
+ }
+ return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
}
+ /**
+ * @param request
+ * @param t
+ * @return Return a response that has every action in request failed w/ the passed in
+ * exception <code>t</code> -- this will get them all retried after some backoff.
+ */
+ private static MultiResponse createAllFailedResponse(final ClientProtos.MultiRequest request,
+ final Throwable t) {
+ MultiResponse massFailedResponse = new MultiResponse();
+ for (RegionAction rAction: request.getRegionActionList()) {
+ byte [] regionName = rAction.getRegion().getValue().toByteArray();
+ for (ClientProtos.Action action: rAction.getActionList()) {
+ massFailedResponse.add(regionName, new Pair<Integer, Object>(action.getIndex(), t));
+ }
+ }
+ return massFailedResponse;
+ }
/**
* @return True if we should send data in cellblocks. This is an expensive call. Cache the
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java Sat Oct 5 00:17:11 2013
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.util.Byte
@InterfaceStability.Stable
public class Result implements CellScannable {
private Cell[] cells;
+ private Boolean exists; // if the query was just to check existence.
// We're not using java serialization. Transient here is just a marker to say
// that this is where we cache row if we're ever asked for it.
private transient byte [] row = null;
@@ -108,7 +109,7 @@ public class Result implements CellScann
@Deprecated
public Result(List<KeyValue> kvs) {
// TODO: Here we presume the passed in Cells are KVs. One day this won't always be so.
- this(kvs.toArray(new Cell[kvs.size()]));
+ this(kvs.toArray(new Cell[kvs.size()]), null);
}
/**
@@ -117,7 +118,14 @@ public class Result implements CellScann
* @param cells List of cells
*/
public static Result create(List<Cell> cells) {
- return new Result(cells.toArray(new Cell[cells.size()]));
+ return new Result(cells.toArray(new Cell[cells.size()]), null);
+ }
+
+ public static Result create(List<Cell> cells, Boolean exists) {
+ if (exists != null){
+ return new Result(null, exists);
+ }
+ return new Result(cells.toArray(new Cell[cells.size()]), exists);
}
/**
@@ -126,12 +134,13 @@ public class Result implements CellScann
* @param cells array of cells
*/
public static Result create(Cell[] cells) {
- return new Result(cells);
+ return new Result(cells, null);
}
/** Private ctor. Use {@link #create(Cell[])}. */
- private Result(Cell[] cells) {
+ private Result(Cell[] cells, Boolean exists) {
this.cells = cells;
+ this.exists = exists;
}
/**
@@ -796,4 +805,12 @@ public class Result implements CellScann
public CellScanner cellScanner() {
return CellUtil.createCellScanner(this.cells);
}
+
+ public Boolean getExists() {
+ return exists;
+ }
+
+ public void setExists(Boolean exists) {
+ this.exists = exists;
+ }
}
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Sat Oct 5 00:17:11 2013
@@ -163,6 +163,7 @@ public class ScannerCallable extends Reg
ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try {
+ controller.setPriority(getTableName());
response = getStub().scan(controller, request);
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
// from client to server will increment this number in both sides. Client passes this
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java Sat Oct 5 00:17:11 2013
@@ -23,7 +23,8 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
-
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
@@ -36,6 +37,15 @@ import com.google.protobuf.RpcController
*/
@InterfaceAudience.Private
public class PayloadCarryingRpcController implements RpcController, CellScannable {
+ /**
+ * Priority to set on this request. Set it here in controller so available composing the
+ * request. This is the ordained way of setting priorities going forward. We will be
+ * undoing the old annotation-based mechanism.
+ */
+ // Currently only multi call makes use of this. Eventually this should be only way to set
+ // priority.
+ private int priority = 0;
+
// TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
/**
@@ -103,4 +113,26 @@ public class PayloadCarryingRpcControlle
public void startCancel() {
throw new UnsupportedOperationException();
}
+
+ /**
+ * @param priority Priority for this request; should fall roughly in the range
+ * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
+ */
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ /**
+ * @param tn Set priority based off the table we are going against.
+ */
+ public void setPriority(final TableName tn) {
+ this.priority = tn != null && tn.isSystemTable()? HConstants.HIGH_QOS: HConstants.NORMAL_QOS;
+ }
+
+ /**
+ * @return The priority of this request
+ */
+ public int getPriority() {
+ return priority;
+ }
}
\ No newline at end of file
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Sat Oct 5 00:17:11 2013
@@ -55,6 +55,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.codec.Codec;
@@ -211,7 +212,8 @@ public class RpcClient {
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class FailedServerException extends IOException {
+ // Shouldn't this be a DoNotRetryException? St.Ack 10/2/2013
+ public static class FailedServerException extends HBaseIOException {
public FailedServerException(String s) {
super(s);
}
@@ -967,8 +969,12 @@ public class RpcClient {
}
// close the streams and therefore the socket
- IOUtils.closeStream(out);
- this.out = null;
+ if (this.out != null) {
+ synchronized(this.out) {
+ IOUtils.closeStream(out);
+ this.out = null;
+ }
+ }
IOUtils.closeStream(in);
this.in = null;
disposeSasl();
@@ -1002,9 +1008,10 @@ public class RpcClient {
* Note: this is not called from the Connection thread, but by other
* threads.
* @param call
+ * @param priority
* @see #readResponse()
*/
- protected void writeRequest(Call call) {
+ protected void writeRequest(Call call, final int priority) {
if (shouldCloseConnection.get()) return;
try {
RequestHeader.Builder builder = RequestHeader.newBuilder();
@@ -1022,6 +1029,8 @@ public class RpcClient {
cellBlockBuilder.setLength(cellBlock.limit());
builder.setCellBlockMeta(cellBlockBuilder.build());
}
+ // Only pass priority if there one. Let zero be same as no priority.
+ if (priority != 0) builder.setPriority(priority);
//noinspection SynchronizeOnNonFinalField
RequestHeader header = builder.build();
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
@@ -1380,6 +1389,12 @@ public class RpcClient {
}
}
+ Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
+ Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout)
+ throws InterruptedException, IOException {
+ return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
+ }
+
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
* with the <code>ticket</code> credentials, returning the value.
@@ -1400,12 +1415,12 @@ public class RpcClient {
*/
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr,
- int rpcTimeout)
+ int rpcTimeout, int priority)
throws InterruptedException, IOException {
Call call = new Call(md, param, cells, returnType);
Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
- connection.writeRequest(call); // send the parameter
+ connection.writeRequest(call, priority); // send the parameter
boolean interrupted = false;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) {
@@ -1632,7 +1647,8 @@ public class RpcClient {
}
Pair<Message, CellScanner> val = null;
try {
- val = call(md, param, cells, returnType, ticket, isa, rpcTimeout);
+ val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
+ pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
if (pcrc != null) {
// Shove the results into controller so can be carried across the proxy/pb service void.
if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Sat Oct 5 00:17:11 2013
@@ -406,6 +406,12 @@ public final class ProtobufUtil {
}
}
}
+ if (proto.hasExistenceOnly() && proto.getExistenceOnly()){
+ get.setCheckExistenceOnly(true);
+ }
+ if (proto.hasClosestRowBefore() && proto.getClosestRowBefore()){
+ get.setClosestRowBefore(true);
+ }
return get;
}
@@ -910,6 +916,12 @@ public final class ProtobufUtil {
if (get.getRowOffsetPerColumnFamily() > 0) {
builder.setStoreOffset(get.getRowOffsetPerColumnFamily());
}
+ if (get.isCheckExistenceOnly()){
+ builder.setExistenceOnly(true);
+ }
+ if (get.isClosestRowBefore()){
+ builder.setClosestRowBefore(true);
+ }
return builder.build();
}
@@ -1038,6 +1050,21 @@ public final class ProtobufUtil {
builder.addCell(toCell(c));
}
}
+ if (result.getExists() != null){
+ builder.setExists(result.getExists());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Convert a client Result to a protocol buffer Result
+ *
+ * @param existence the client existence to send
+ * @return the converted protocol buffer Result
+ */
+ public static ClientProtos.Result toResult(final boolean existence) {
+ ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
+ builder.setExists(existence);
return builder.build();
}
@@ -1051,6 +1078,9 @@ public final class ProtobufUtil {
public static ClientProtos.Result toResultNoData(final Result result) {
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setAssociatedCellCount(result.size());
+ if (result.getExists() != null){
+ builder.setExists(result.getExists());
+ }
return builder.build();
}
@@ -1061,12 +1091,16 @@ public final class ProtobufUtil {
* @return the converted client Result
*/
public static Result toResult(final ClientProtos.Result proto) {
+ if (proto.hasExists()) {
+ return Result.create(null, proto.getExists());
+ }
+
List<CellProtos.Cell> values = proto.getCellList();
List<Cell> cells = new ArrayList<Cell>(values.size());
- for (CellProtos.Cell c: values) {
+ for (CellProtos.Cell c : values) {
cells.add(toCell(c));
}
- return Result.create(cells);
+ return Result.create(cells, null);
}
/**
@@ -1079,6 +1113,10 @@ public final class ProtobufUtil {
*/
public static Result toResult(final ClientProtos.Result proto, final CellScanner scanner)
throws IOException {
+ if (proto.hasExists()){
+ return Result.create(null, proto.getExists());
+ }
+
// TODO: Unit test that has some Cells in scanner and some in the proto.
List<Cell> cells = null;
if (proto.hasAssociatedCellCount()) {
@@ -1094,7 +1132,7 @@ public final class ProtobufUtil {
for (CellProtos.Cell c: values) {
cells.add(toCell(c));
}
- return Result.create(cells);
+ return Result.create(cells, null);
}
/**
@@ -2242,11 +2280,15 @@ public final class ProtobufUtil {
", row=" + getStringForByteString(r.getGet().getRow());
} else if (m instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest r = (ClientProtos.MultiRequest) m;
- ClientProtos.MultiAction action = r.getActionList().get(0);
- return "region= " + getStringForByteString(r.getRegion().getValue()) +
- ", for " + r.getActionCount() +
- " actions and 1st row key=" + getStringForByteString(action.hasMutation() ?
- action.getMutation().getRow() : action.getGet().getRow());
+ // Get first set of Actions.
+ ClientProtos.RegionAction actions = r.getRegionActionList().get(0);
+ String row = actions.getActionCount() <= 0? "":
+ getStringForByteString(actions.getAction(0).hasGet()?
+ actions.getAction(0).getGet().getRow():
+ actions.getAction(0).getMutation().getRow());
+ return "region= " + getStringForByteString(actions.getRegion().getValue()) +
+ ", for " + r.getRegionActionCount() +
+ " actions and 1st row key=" + row;
} else if (m instanceof ClientProtos.MutateRequest) {
ClientProtos.MutateRequest r = (ClientProtos.MutateRequest) m;
return "region= " + getStringForByteString(r.getRegion().getValue()) +
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Sat Oct 5 00:17:11 2013
@@ -63,14 +63,12 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -101,6 +99,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Triple;
+import org.mortbay.log.Log;
import com.google.protobuf.ByteString;
@@ -131,7 +130,6 @@ public final class RequestConverter {
GetRequest.Builder builder = GetRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
- builder.setClosestRowBefore(true);
builder.setRegion(region);
Column.Builder columnBuilder = Column.newBuilder();
@@ -140,63 +138,30 @@ public final class RequestConverter {
ClientProtos.Get.newBuilder();
getBuilder.setRow(ByteString.copyFrom(row));
getBuilder.addColumn(columnBuilder.build());
+ getBuilder.setClosestRowBefore(true);
builder.setGet(getBuilder.build());
return builder.build();
}
- /**
- * Create a protocol buffer GetRequest for a client Get
- *
- * @param regionName the name of the region to get
- * @param get the client Get
- * @return a protocol buffer GetReuqest
- */
- public static GetRequest buildGetRequest(final byte[] regionName,
- final Get get) throws IOException {
- return buildGetRequest(regionName, get, false);
- }
/**
* Create a protocol buffer GetRequest for a client Get
*
* @param regionName the name of the region to get
* @param get the client Get
- * @param existenceOnly indicate if check row existence only
* @return a protocol buffer GetRequest
*/
public static GetRequest buildGetRequest(final byte[] regionName,
- final Get get, final boolean existenceOnly) throws IOException {
+ final Get get) throws IOException {
GetRequest.Builder builder = GetRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
- builder.setExistenceOnly(existenceOnly);
builder.setRegion(region);
builder.setGet(ProtobufUtil.toGet(get));
return builder.build();
}
/**
- * Create a protocol buffer MultiGetRequest for client Gets All gets are going to be run against
- * the same region.
- * @param regionName the name of the region to get from
- * @param gets the client Gets
- * @param existenceOnly indicate if check rows existence only
- * @return a protocol buffer MultiGetRequest
- */
- public static MultiGetRequest buildMultiGetRequest(final byte[] regionName, final List<Get> gets,
- final boolean existenceOnly, final boolean closestRowBefore) throws IOException {
- MultiGetRequest.Builder builder = MultiGetRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
- builder.setExistenceOnly(existenceOnly);
- builder.setClosestRowBefore(closestRowBefore);
- builder.setRegion(region);
- for (Get get : gets) {
- builder.addGet(ProtobufUtil.toGet(get));
- }
- return builder.build();
- }
-
- /**
* Create a protocol buffer MutateRequest for a client increment
*
* @param regionName
@@ -358,17 +323,18 @@ public final class RequestConverter {
}
/**
- * Create a protocol buffer MultiRequest for a row mutations
- *
+ * Create a protocol buffer MultiRequest for row mutations.
+ * Does not propagate Action absolute position. Does not set atomic action on the created
+ * RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
- * @return a multi request
+ * @return a data-laden RegionMutation.Builder
* @throws IOException
*/
- public static MultiRequest buildMultiRequest(final byte[] regionName,
+ public static RegionAction.Builder buildRegionAction(final byte [] regionName,
final RowMutations rowMutations)
throws IOException {
- MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
+ RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
@@ -380,25 +346,26 @@ public final class RequestConverter {
mutation.getClass().getName());
}
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation);
- builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
+ builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
}
- return builder.build();
+ return builder;
}
/**
* Create a protocol buffer MultiRequest for row mutations that does not hold data. Data/Cells
- * are carried outside of protobuf. Return references to the Cells in <code>cells</code> param
- *
+ * are carried outside of protobuf. Return references to the Cells in <code>cells</code> param.
+ * Does not propagate Action absolute position. Does not set atomic action on the created
+ * RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
* @param cells Return in here a list of Cells as CellIterable.
- * @return a multi request minus data
+ * @return a region mutation minus data
* @throws IOException
*/
- public static MultiRequest buildNoDataMultiRequest(final byte[] regionName,
+ public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
final RowMutations rowMutations, final List<CellScannable> cells)
throws IOException {
- MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
+ RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type = null;
if (mutation instanceof Put) {
@@ -411,17 +378,16 @@ public final class RequestConverter {
}
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation);
cells.add(mutation);
- builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
+ builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
}
- return builder.build();
+ return builder;
}
- private static MultiRequest.Builder getMultiRequestBuilderWithRegionAndAtomicSet(final byte [] regionName,
- final boolean atomic) {
- MultiRequest.Builder builder = MultiRequest.newBuilder();
+ private static RegionAction.Builder getRegionActionBuilderWithRegion(final byte [] regionName) {
+ RegionAction.Builder builder = RegionAction.newBuilder();
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- return builder.setAtomic(atomic);
+ return builder;
}
/**
@@ -510,39 +476,43 @@ public final class RequestConverter {
/**
* Create a protocol buffer multi request for a list of actions.
- * RowMutations in the list (if any) will be ignored.
+ * Propagates Actions original index.
*
* @param regionName
* @param actions
* @return a multi request
* @throws IOException
*/
- public static <R> MultiRequest buildMultiRequest(final byte[] regionName,
+ public static <R> RegionAction.Builder buildRegionAction(final byte[] regionName,
final List<Action<R>> actions)
throws IOException {
- MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
+ RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
for (Action<R> action: actions) {
- MultiAction.Builder protoAction = MultiAction.newBuilder();
Row row = action.getAction();
+ ClientProtos.Action.Builder actionBuilder =
+ ClientProtos.Action.newBuilder().setIndex(action.getOriginalIndex());
if (row instanceof Get) {
- protoAction.setGet(ProtobufUtil.toGet((Get)row));
+ Get g = (Get)row;
+ builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
} else if (row instanceof Put) {
- protoAction.setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row)));
} else if (row instanceof Delete) {
- protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row)));
} else if (row instanceof Append) {
- protoAction.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row)));
} else if (row instanceof Increment) {
- protoAction.setMutation(ProtobufUtil.toMutation((Increment)row));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation((Increment)row)));
} else if (row instanceof RowMutations) {
- continue; // ignore RowMutations
+ throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
} else {
- throw new DoNotRetryIOException(
- "multi doesn't support " + row.getClass().getName());
+ throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
- builder.addAction(protoAction.build());
}
- return builder.build();
+ return builder;
}
/**
@@ -553,7 +523,7 @@ public final class RequestConverter {
* carried by protobuf. We return references to the data by adding them to the passed in
* <code>data</code> param.
*
- * RowMutations in the list (if any) will be ignored.
+ * <p>Propagates Actions original index.
*
* @param regionName
* @param actions
@@ -561,20 +531,22 @@ public final class RequestConverter {
* @return a multi request that does not carry any data.
* @throws IOException
*/
- public static <R> MultiRequest buildNoDataMultiRequest(final byte[] regionName,
+ public static <R> RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
final List<Action<R>> actions, final List<CellScannable> cells)
throws IOException {
- MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
+ RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
for (Action<R> action: actions) {
- MultiAction.Builder protoAction = MultiAction.newBuilder();
Row row = action.getAction();
+ ClientProtos.Action.Builder actionBuilder =
+ ClientProtos.Action.newBuilder().setIndex(action.getOriginalIndex());
if (row instanceof Get) {
- // Gets are carried by protobufs.
- protoAction.setGet(ProtobufUtil.toGet((Get)row));
+ Get g = (Get)row;
+ builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
} else if (row instanceof Put) {
Put p = (Put)row;
cells.add(p);
- protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p)));
} else if (row instanceof Delete) {
Delete d = (Delete)row;
int size = d.size();
@@ -585,26 +557,29 @@ public final class RequestConverter {
// metadata only in the pb and then send the kv along the side in cells.
if (size > 0) {
cells.add(d);
- protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d)));
} else {
- protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d)));
}
} else if (row instanceof Append) {
Append a = (Append)row;
cells.add(a);
- protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a)));
} else if (row instanceof Increment) {
Increment i = (Increment)row;
cells.add(i);
- protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i));
+ builder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i)));
} else if (row instanceof RowMutations) {
continue; // ignore RowMutations
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
- builder.addAction(protoAction.build());
}
- return builder.build();
+ return builder;
}
// End utilities for Client
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Sat Oct 5 00:17:11 2013
@@ -39,14 +39,19 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
import com.google.protobuf.ByteString;
@@ -68,27 +73,58 @@ public final class ResponseConverter {
/**
* Get the results from a protocol buffer MultiResponse
*
- * @param proto the protocol buffer MultiResponse to convert
+ * @param request the protocol buffer MultiResponse to convert
* @param cells Cells to go with the passed in <code>proto</code>. Can be null.
* @return the results that were in the MultiResponse (a Result or an Exception).
* @throws IOException
*/
- public static List<Object> getResults(final ClientProtos.MultiResponse proto,
- final CellScanner cells)
+ public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request,
+ final MultiResponse response, final CellScanner cells)
throws IOException {
- List<Object> results = new ArrayList<Object>();
- List<ActionResult> resultList = proto.getResultList();
- for (int i = 0, n = resultList.size(); i < n; i++) {
- ActionResult result = resultList.get(i);
- if (result.hasException()) {
- results.add(ProtobufUtil.toException(result.getException()));
- } else if (result.hasValue()) {
- ClientProtos.Result value = result.getValue();
- results.add(ProtobufUtil.toResult(value, cells));
- } else {
- results.add(new Result());
+ int requestRegionActionCount = request.getRegionActionCount();
+ int responseRegionActionResultCount = response.getRegionActionResultCount();
+ if (requestRegionActionCount != responseRegionActionResultCount) {
+ throw new IllegalStateException("Request mutation count=" + responseRegionActionResultCount +
+ " does not match response mutation result count=" + responseRegionActionResultCount);
+ }
+
+ org.apache.hadoop.hbase.client.MultiResponse results =
+ new org.apache.hadoop.hbase.client.MultiResponse();
+
+ for (int i = 0; i < responseRegionActionResultCount; i++) {
+ RegionAction actions = request.getRegionAction(i);
+ RegionActionResult actionResult = response.getRegionActionResult(i);
+ byte[] regionName = actions.getRegion().toByteArray();
+
+ if (actionResult.hasException()){
+ Throwable regionException = ProtobufUtil.toException(actionResult.getException());
+ for (ClientProtos.Action a : actions.getActionList()){
+ results.add(regionName, new Pair<Integer, Object>(a.getIndex(), regionException));
+ }
+ continue;
+ }
+
+ if (actions.getActionCount() != actionResult.getResultOrExceptionCount()) {
+ throw new IllegalStateException("actions.getActionCount=" + actions.getActionCount() +
+ ", actionResult.getResultOrExceptionCount=" +
+ actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion());
+ }
+
+ for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
+ if (roe.hasException()) {
+ results.add(regionName, new Pair<Integer, Object>(roe.getIndex(),
+ ProtobufUtil.toException(roe.getException())));
+ } else if (roe.hasResult()) {
+ results.add(regionName, new Pair<Integer, Object>(roe.getIndex(),
+ ProtobufUtil.toResult(roe.getResult(), cells)));
+ } else {
+ // no result & no exception. Unexpected.
+ throw new IllegalStateException("No result & no exception roe=" + roe +
+ " for region " + actions.getRegion());
+ }
}
}
+
return results;
}
@@ -96,16 +132,36 @@ public final class ResponseConverter {
* Wrap a throwable to an action result.
*
* @param t
- * @return an action result
+ * @return an action result builder
*/
- public static ActionResult buildActionResult(final Throwable t) {
- ActionResult.Builder builder = ActionResult.newBuilder();
+ public static ResultOrException.Builder buildActionResult(final Throwable t) {
+ ResultOrException.Builder builder = ResultOrException.newBuilder();
+ if (t != null) builder.setException(buildException(t));
+ return builder;
+ }
+
+ /**
+ * Wrap a throwable to an action result.
+ *
+ * @param r
+ * @return an action result builder
+ */
+ public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) {
+ ResultOrException.Builder builder = ResultOrException.newBuilder();
+ if (r != null) builder.setResult(r);
+ return builder;
+ }
+
+ /**
+ * @param t
+ * @return NameValuePair of the exception name to stringified version os exception.
+ */
+ public static NameBytesPair buildException(final Throwable t) {
NameBytesPair.Builder parameterBuilder = NameBytesPair.newBuilder();
parameterBuilder.setName(t.getClass().getName());
parameterBuilder.setValue(
ByteString.copyFromUtf8(StringUtils.stringifyException(t)));
- builder.setException(parameterBuilder.build());
- return builder.build();
+ return parameterBuilder.build();
}
/**
Modified: hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java (original)
+++ hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Sat Oct 5 00:17:11 2013
@@ -172,7 +172,7 @@ public class TestAsyncProcess {
*/
static class MyConnectionImpl2 extends MyConnectionImpl {
List<HRegionLocation> hrl;
- boolean usedRegions[];
+ final boolean usedRegions[];
protected MyConnectionImpl2(List<HRegionLocation> hrl) {
super(c);
@@ -186,7 +186,7 @@ public class TestAsyncProcess {
int i = 0;
for (HRegionLocation hr:hrl){
if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
- usedRegions[i] = true;
+ usedRegions[i] = true;
return hr;
}
i++;
@@ -475,9 +475,9 @@ public class TestAsyncProcess {
private class MyCB implements AsyncProcess.AsyncProcessCallback<Object> {
- private AtomicInteger successCalled = new AtomicInteger(0);
- private AtomicInteger failureCalled = new AtomicInteger(0);
- private AtomicInteger retriableFailure = new AtomicInteger(0);
+ private final AtomicInteger successCalled = new AtomicInteger(0);
+ private final AtomicInteger failureCalled = new AtomicInteger(0);
+ private final AtomicInteger retriableFailure = new AtomicInteger(0);
@Override
@@ -705,7 +705,7 @@ public class TestAsyncProcess {
*/
@Test
public void testThreadCreation() throws Exception {
- final int NB_REGS = 10000;
+ final int NB_REGS = 100;
List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
List<Get> gets = new ArrayList<Get>(NB_REGS);
for (int i = 0; i < NB_REGS; i++) {
@@ -721,11 +721,13 @@ public class TestAsyncProcess {
HTable ht = new HTable();
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
ht.connection = con;
- ht.batch(gets);
+
+ ht.batch(gets);
+
Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
- Assert.assertEquals(con.ap.nbMultiResponse.get(), 2); // 1 multi response per server
- Assert.assertEquals(con.nbThreads.get(), 2); // 1 thread per server
+ Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());
+ Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
int nbReg = 0;
for (int i =0; i<NB_REGS; i++){
Modified: hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java (original)
+++ hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java Sat Oct 5 00:17:11 2013
@@ -301,4 +301,4 @@ public class TestClientNoCluster {
return this.stub;
}
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (original)
+++ hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java Sat Oct 5 00:17:11 2013
@@ -207,11 +207,13 @@ public final class CellUtil {
@Override
public Cell current() {
+ if (cells == null) return null;
return (index < 0)? null: this.cells[index];
}
@Override
public boolean advance() {
+ if (cells == null) return false;
return ++index < this.cells.length;
}
};
Modified: hbase/branches/0.96/hbase-protocol/README.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-protocol/README.txt?rev=1529355&r1=1529354&r2=1529355&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-protocol/README.txt (original)
+++ hbase/branches/0.96/hbase-protocol/README.txt Sat Oct 5 00:17:11 2013
@@ -25,7 +25,6 @@ terminal and hit return -- the protoc co
do
protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE
done
- ll $JAVA_DIR/org/apache/hadoop/hbase/protobuf/generated
After you've done the above, check it in and then check it in (or post a patch
on a JIRA with your definition file changes and the generated files).