You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/10/27 22:56:42 UTC

svn commit: r708341 - /hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java

Author: stack
Date: Mon Oct 27 14:56:42 2008
New Revision: 708341

URL: http://svn.apache.org/viewvc?rev=708341&view=rev
Log:
Clean logging little

Modified:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=708341&r1=708340&r2=708341&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Oct 27 14:56:42 2008
@@ -44,6 +44,7 @@
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
@@ -777,12 +778,10 @@
           if (rootRegionAddress == null) {
             try {
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Sleeping. Waiting for root region.");
+                LOG.debug("Sleeping " + getPauseTime(tries) +
+                  "ms, waiting for root region.");
               }
               Thread.sleep(getPauseTime(tries));
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Wake. Retry finding root region.");
-              }
             } catch (InterruptedException iex) {
               // continue
             }
@@ -903,6 +902,67 @@
       }
       return null;    
     }
+    
+    public void processBatchOfRows(ArrayList<BatchUpdate> list, byte[] tableName)
+        throws IOException {
+      // See HBASE-748 for pseudo code of this method
+      if (list.isEmpty()) {
+        return;
+      }
+      Collections.sort(list);
+      List<BatchUpdate> tempUpdates = new ArrayList<BatchUpdate>();
+      byte[] currentRegion = getRegionLocation(tableName, list.get(0).getRow(),
+          false).getRegionInfo().getRegionName();
+      byte[] region = currentRegion;
+      boolean isLastRow = false;
+      int tries = 0;
+      for (int i = 0; i < list.size() && tries < numRetries; i++) {
+        BatchUpdate batchUpdate = list.get(i);
+        tempUpdates.add(batchUpdate);
+        isLastRow = (i + 1) == list.size();
+        if (!isLastRow) {
+          region = getRegionLocation(tableName, list.get(i + 1).getRow(), false)
+              .getRegionInfo().getRegionName();
+        }
+        if (!Bytes.equals(currentRegion, region) || isLastRow) {
+          final BatchUpdate[] updates = tempUpdates.toArray(new BatchUpdate[0]);
+          int index = getRegionServerForWithoutRetries(new ServerCallable<Integer>(
+              this, tableName, batchUpdate.getRow()) {
+            public Integer call() throws IOException {
+              int i = server.batchUpdates(location.getRegionInfo()
+                  .getRegionName(), updates);
+              return i;
+            }
+          });
+          if (index != updates.length - 1) {
+            if (tries == numRetries - 1) {
+              throw new RetriesExhaustedException("Some server",
+                  currentRegion, batchUpdate.getRow(), 
+                  tries, new ArrayList<Throwable>());
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("reloading table servers because region " +
+                  "server didn't accept updates ");
+            }
+            // Basic waiting time. If many updates are flushed, tests have shown
+            // that this is barely needed but when commiting 1 update this may
+            // get retried hundreds of times.
+            try {
+              Thread.sleep(getPauseTime(tries));
+              tries++;
+            } catch (InterruptedException e) {
+              // continue
+            }
+            i = i - updates.length + index;
+            region = getRegionLocation(tableName, list.get(i + 1).getRow(),
+                true).getRegionInfo().getRegionName();
+
+          }
+          currentRegion = region;
+          tempUpdates.clear();
+        }
+      }
+    }
 
     void close(boolean stopProxy) {
       if (master != null) {
@@ -921,4 +981,4 @@
       }
     }
   } 
-}
\ No newline at end of file
+}