You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/11/19 14:38:49 UTC
svn commit: r1543427 - in /hbase/trunk/hbase-client/src:
main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Author: nkeywal
Date: Tue Nov 19 13:38:49 2013
New Revision: 1543427
URL: http://svn.apache.org/r1543427
Log:
HBASE-9988 DOn't use HRI#getEncodedName in the client
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1543427&r1=1543426&r2=1543427&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Tue Nov 19 13:38:49 2013
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.cloudera.htrace.Trace;
@@ -95,13 +97,12 @@ class AsyncProcess<CResult> {
protected final ExecutorService pool;
protected final AsyncProcessCallback<CResult> callback;
protected final BatchErrors errors = new BatchErrors();
- protected final BatchErrors retriedErrors = new BatchErrors();
protected final AtomicBoolean hasError = new AtomicBoolean(false);
protected final AtomicLong tasksSent = new AtomicLong(0);
protected final AtomicLong tasksDone = new AtomicLong(0);
protected final AtomicLong retriesCnt = new AtomicLong(0);
- protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
- new ConcurrentHashMap<String, AtomicInteger>();
+ protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
+ new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
new ConcurrentHashMap<ServerName, AtomicInteger>();
@@ -290,7 +291,7 @@ class AsyncProcess<CResult> {
// Remember the previous decisions about regions or region servers we put in the
// final multi.
- Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
+ Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
int posInList = -1;
@@ -376,15 +377,14 @@ class AsyncProcess<CResult> {
* We're taking into account the past decision; if we have already accepted
* operation on a given region, we accept all operations for this region.
*
- *
* @param loc; the region and the server name we want to use.
* @return true if this region is considered as busy.
*/
protected boolean canTakeOperation(HRegionLocation loc,
- Map<String, Boolean> regionsIncluded,
+ Map<Long, Boolean> regionsIncluded,
Map<ServerName, Boolean> serversIncluded) {
- String encodedRegionName = loc.getRegionInfo().getEncodedName();
- Boolean regionPrevious = regionsIncluded.get(encodedRegionName);
+ long regionId = loc.getRegionInfo().getRegionId();
+ Boolean regionPrevious = regionsIncluded.get(regionId);
if (regionPrevious != null) {
// We already know what to do with this region.
@@ -394,14 +394,14 @@ class AsyncProcess<CResult> {
Boolean serverPrevious = serversIncluded.get(loc.getServerName());
if (Boolean.FALSE.equals(serverPrevious)) {
// It's a new region, on a region server that we have already excluded.
- regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+ regionsIncluded.put(regionId, Boolean.FALSE);
return false;
}
- AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+ AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
// Too many tasks on this region already.
- regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+ regionsIncluded.put(regionId, Boolean.FALSE);
return false;
}
@@ -424,7 +424,7 @@ class AsyncProcess<CResult> {
}
if (!ok) {
- regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+ regionsIncluded.put(regionId, Boolean.FALSE);
serversIncluded.put(loc.getServerName(), Boolean.FALSE);
return false;
}
@@ -434,7 +434,7 @@ class AsyncProcess<CResult> {
assert serverPrevious.equals(Boolean.TRUE);
}
- regionsIncluded.put(encodedRegionName, Boolean.TRUE);
+ regionsIncluded.put(regionId, Boolean.TRUE);
return true;
}
@@ -597,18 +597,18 @@ class AsyncProcess<CResult> {
if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
canRetry = false;
}
- byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
+ byte[] region = null;
if (canRetry && callback != null) {
+ region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
}
- if (canRetry) {
- if (LOG.isTraceEnabled()) {
- retriedErrors.add(throwable, row, location);
- }
- } else {
+ if (!canRetry) {
if (callback != null) {
+ if (region == null && location != null) {
+ region = location.getRegionInfo().getEncodedNameAsBytes();
+ }
callback.failure(originalIndex, region, row, throwable);
}
errors.add(throwable, row, location);
@@ -890,7 +890,6 @@ class AsyncProcess<CResult> {
*/
public void clearErrors() {
errors.clear();
- retriedErrors.clear();
hasError.set(false);
}
@@ -912,11 +911,13 @@ class AsyncProcess<CResult> {
serverCnt.incrementAndGet();
for (byte[] regBytes : regions) {
- String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
- AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+ AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
if (regionCnt == null) {
- taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
- regionCnt = taskCounterPerRegion.get(encodedRegionName);
+ regionCnt = new AtomicInteger();
+ AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
+ if (oldCnt != null) {
+ regionCnt = oldCnt;
+ }
}
regionCnt.incrementAndGet();
}
@@ -927,8 +928,7 @@ class AsyncProcess<CResult> {
*/
protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
for (byte[] regBytes : regions) {
- String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
- AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+ AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
regionCnt.decrementAndGet();
}
Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1543427&r1=1543426&r2=1543427&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java (original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Tue Nov 19 13:38:49 2013
@@ -60,11 +60,12 @@ public class TestAsyncProcess {
private static ServerName sn = new ServerName("localhost:10,1254");
private static ServerName sn2 = new ServerName("localhost:140,12540");
- private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
+ private static HRegionInfo hri1 =
+ new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
private static HRegionInfo hri2 =
- new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
+ new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
private static HRegionInfo hri3 =
- new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW);
+ new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
@@ -264,7 +265,7 @@ public class TestAsyncProcess {
puts.add(createPut(2, true)); // <== new region, but the rs is ok
ap.submit(puts, false);
- Assert.assertEquals(1, puts.size());
+ Assert.assertEquals(" puts=" + puts, 1, puts.size());
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
ap.submit(puts, false);
@@ -338,7 +339,7 @@ public class TestAsyncProcess {
final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
ap.tasksSent.incrementAndGet();
final AtomicInteger ai = new AtomicInteger(1);
- ap.taskCounterPerRegion.put(hri1.getEncodedName(), ai);
+ ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
final AtomicBoolean checkPoint = new AtomicBoolean(false);
final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
@@ -716,7 +717,7 @@ public class TestAsyncProcess {
List<Get> gets = new ArrayList<Get>(NB_REGS);
for (int i = 0; i < NB_REGS; i++) {
HRegionInfo hri = new HRegionInfo(
- DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L));
+ DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
hrls.add(hrl);