You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/11/19 14:39:50 UTC

svn commit: r1543428 - in /hbase/branches/0.96/hbase-client/src: main/java/org/apache/hadoop/hbase/client/AsyncProcess.java test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java

Author: nkeywal
Date: Tue Nov 19 13:39:50 2013
New Revision: 1543428

URL: http://svn.apache.org/r1543428
Log:
HBASE-9988 DOn't use HRI#getEncodedName in the client

Modified:
    hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java

Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1543428&r1=1543427&r2=1543428&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Tue Nov 19 13:39:50 2013
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.cloudera.htrace.Trace;
@@ -95,13 +97,12 @@ class AsyncProcess<CResult> {
   protected final ExecutorService pool;
   protected final AsyncProcessCallback<CResult> callback;
   protected final BatchErrors errors = new BatchErrors();
-  protected final BatchErrors retriedErrors = new BatchErrors();
   protected final AtomicBoolean hasError = new AtomicBoolean(false);
   protected final AtomicLong tasksSent = new AtomicLong(0);
   protected final AtomicLong tasksDone = new AtomicLong(0);
   protected final AtomicLong retriesCnt = new AtomicLong(0);
-  protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
-      new ConcurrentHashMap<String, AtomicInteger>();
+  protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
+      new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
       new ConcurrentHashMap<ServerName, AtomicInteger>();
 
@@ -289,7 +290,7 @@ class AsyncProcess<CResult> {
 
       // Remember the previous decisions about regions or region servers we put in the
       //  final multi.
-      Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
+      Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
 
       int posInList = -1;
@@ -369,15 +370,14 @@ class AsyncProcess<CResult> {
    * We're taking into account the past decision; if we have already accepted
    * operation on a given region, we accept all operations for this region.
    *
-   *
    * @param loc; the region and the server name we want to use.
    * @return true if this region is considered as busy.
    */
   protected boolean canTakeOperation(HRegionLocation loc,
-                                     Map<String, Boolean> regionsIncluded,
+                                     Map<Long, Boolean> regionsIncluded,
                                      Map<ServerName, Boolean> serversIncluded) {
-    String encodedRegionName = loc.getRegionInfo().getEncodedName();
-    Boolean regionPrevious = regionsIncluded.get(encodedRegionName);
+    long regionId = loc.getRegionInfo().getRegionId();
+    Boolean regionPrevious = regionsIncluded.get(regionId);
 
     if (regionPrevious != null) {
       // We already know what to do with this region.
@@ -387,14 +387,14 @@ class AsyncProcess<CResult> {
     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
     if (Boolean.FALSE.equals(serverPrevious)) {
       // It's a new region, on a region server that we have already excluded.
-      regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+      regionsIncluded.put(regionId, Boolean.FALSE);
       return false;
     }
 
-    AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+    AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
       // Too many tasks on this region already.
-      regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+      regionsIncluded.put(regionId, Boolean.FALSE);
       return false;
     }
 
@@ -417,7 +417,7 @@ class AsyncProcess<CResult> {
       }
 
       if (!ok) {
-        regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+        regionsIncluded.put(regionId, Boolean.FALSE);
         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
         return false;
       }
@@ -427,7 +427,7 @@ class AsyncProcess<CResult> {
       assert serverPrevious.equals(Boolean.TRUE);
     }
 
-    regionsIncluded.put(encodedRegionName, Boolean.TRUE);
+    regionsIncluded.put(regionId, Boolean.TRUE);
 
     return true;
   }
@@ -582,18 +582,18 @@ class AsyncProcess<CResult> {
     if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
       canRetry = false;
     }
-    byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
 
+    byte[] region = null;
     if (canRetry && callback != null) {
+      region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
       canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
     }
 
-    if (canRetry) {
-      if (LOG.isTraceEnabled()) {
-        retriedErrors.add(throwable, row, location);
-      }
-    } else {
+    if (!canRetry) {
       if (callback != null) {
+        if (region == null && location != null) {
+          region = location.getRegionInfo().getEncodedNameAsBytes();
+        }
         callback.failure(originalIndex, region, row, throwable);
       }
       errors.add(throwable, row, location);
@@ -875,7 +875,6 @@ class AsyncProcess<CResult> {
    */
   public void clearErrors() {
     errors.clear();
-    retriedErrors.clear();
     hasError.set(false);
   }
 
@@ -897,11 +896,13 @@ class AsyncProcess<CResult> {
     serverCnt.incrementAndGet();
 
     for (byte[] regBytes : regions) {
-      String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
-      AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
       if (regionCnt == null) {
-        taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
-        regionCnt = taskCounterPerRegion.get(encodedRegionName);
+        regionCnt = new AtomicInteger();
+        AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
+        if (oldCnt != null) {
+          regionCnt = oldCnt;
+        }
       }
       regionCnt.incrementAndGet();
     }
@@ -912,8 +913,7 @@ class AsyncProcess<CResult> {
    */
   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
     for (byte[] regBytes : regions) {
-      String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
-      AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
       regionCnt.decrementAndGet();
     }
 

Modified: hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1543428&r1=1543427&r2=1543428&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java (original)
+++ hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Tue Nov 19 13:39:50 2013
@@ -60,11 +60,12 @@ public class TestAsyncProcess {
 
   private static ServerName sn = new ServerName("localhost:10,1254");
   private static ServerName sn2 = new ServerName("localhost:140,12540");
-  private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
+  private static HRegionInfo hri1 =
+      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
   private static HRegionInfo hri2 =
-      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
+      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
   private static HRegionInfo hri3 =
-      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW);
+      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
   private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
   private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
   private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
@@ -264,7 +265,7 @@ public class TestAsyncProcess {
     puts.add(createPut(2, true)); // <== new region, but the rs is ok
 
     ap.submit(puts, false);
-    Assert.assertEquals(1, puts.size());
+    Assert.assertEquals(" puts=" + puts, 1, puts.size());
 
     ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
     ap.submit(puts, false);
@@ -338,7 +339,7 @@ public class TestAsyncProcess {
     final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
     ap.tasksSent.incrementAndGet();
     final AtomicInteger ai = new AtomicInteger(1);
-    ap.taskCounterPerRegion.put(hri1.getEncodedName(), ai);
+    ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
 
     final AtomicBoolean checkPoint = new AtomicBoolean(false);
     final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
@@ -716,7 +717,7 @@ public class TestAsyncProcess {
     List<Get> gets = new ArrayList<Get>(NB_REGS);
     for (int i = 0; i < NB_REGS; i++) {
       HRegionInfo hri = new HRegionInfo(
-          DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L));
+          DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
       HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
       hrls.add(hrl);