You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/05/12 01:16:50 UTC

[2/3] hbase git commit: HBASE-13606 AssignmentManager.assign() is not sync in both path

HBASE-13606 AssignmentManager.assign() is not sync in both path


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30ecf990
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30ecf990
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30ecf990

Branch: refs/heads/master
Commit: 30ecf990fe2a343e418eedcffd1d8d5c94ab1fd3
Parents: c3f83a9
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon May 11 23:42:11 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue May 12 00:13:29 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/master/AssignmentManager.java  | 89 +++++++++++++++++---
 .../hbase/master/GeneralBulkAssigner.java       | 29 +------
 2 files changed, 80 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/30ecf990/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 4a1e71f..eae9999 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -156,6 +156,7 @@ public class AssignmentManager {
   // bulk assigning may be not as efficient.
   private final int bulkAssignThresholdRegions;
   private final int bulkAssignThresholdServers;
+  private final int bulkPerRegionOpenTimeGuesstimate;
 
   // Should bulk assignment wait till all regions are assigned,
   // or it is timed out?  This is useful to measure bulk assignment
@@ -194,7 +195,7 @@ public class AssignmentManager {
 
   /** Listeners that are called on assignment events. */
   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
-  
+
   private RegionStateListener regionStateListener;
 
   /**
@@ -244,6 +245,8 @@ public class AssignmentManager {
       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+    this.bulkPerRegionOpenTimeGuesstimate =
+      conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
 
     this.metricsAssignmentManager = new MetricsAssignmentManager();
     this.tableLockManager = tableLockManager;
@@ -831,6 +834,18 @@ public class AssignmentManager {
           }
         }
       }
+
+      // wait for assignment completion
+      ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
+      for (HRegionInfo region: regions) {
+        if (!region.getTable().isSystemTable()) {
+          userRegionSet.add(region);
+        }
+      }
+      if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+            System.currentTimeMillis())) {
+        LOG.debug("some user regions are still in transition: " + userRegionSet);
+      }
       LOG.debug("Bulk assigning done for " + destination);
       return true;
     } finally {
@@ -1349,22 +1364,62 @@ public class AssignmentManager {
    * If the region is already assigned, returns immediately.  Otherwise, method
    * blocks until the region is assigned.
    * @param regionInfo region to wait on assignment for
+   * @return true if the region is assigned false otherwise.
    * @throws InterruptedException
    */
   public boolean waitForAssignment(HRegionInfo regionInfo)
       throws InterruptedException {
-    while (!regionStates.isRegionOnline(regionInfo)) {
-      if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
-          || this.server.isStopped()) {
-        return false;
-      }
+    ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
+    regionSet.add(regionInfo);
+    return waitForAssignment(regionSet, true, Long.MAX_VALUE);
+  }
 
-      // We should receive a notification, but it's
-      //  better to have a timeout to recheck the condition here:
-      //  it lowers the impact of a race condition if any
-      regionStates.waitForUpdate(100);
+  /**
+   * Waits until the specified region has completed assignment, or the deadline is reached.
+   */
+  protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+      final boolean waitTillAllAssigned, final int reassigningRegions,
+      final long minEndTime) throws InterruptedException {
+    long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+    return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
+  }
+
+  /**
+   * Waits until the specified region has completed assignment, or the deadline is reached.
+   * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
+   * @param waitTillAllAssigned true if we should wait all the regions to be assigned
+   * @param deadline the timestamp after which the wait is aborted
+   * @return true if all the regions are assigned false otherwise.
+   * @throws InterruptedException
+   */
+  protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+      final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
+    // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+    while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
+      int failedOpenCount = 0;
+      Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+      while (regionInfoIterator.hasNext()) {
+        HRegionInfo hri = regionInfoIterator.next();
+        if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
+            State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
+          regionInfoIterator.remove();
+        } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
+          failedOpenCount++;
+        }
+      }
+      if (!waitTillAllAssigned) {
+        // No need to wait, let assignment going on asynchronously
+        break;
+      }
+      if (!regionSet.isEmpty()) {
+        if (failedOpenCount == regionSet.size()) {
+          // all the regions we are waiting had an error on open.
+          break;
+        }
+        regionStates.waitForUpdate(100);
+      }
     }
-    return true;
+    return regionSet.isEmpty();
   }
 
   /**
@@ -1453,15 +1508,27 @@ public class AssignmentManager {
         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
           " region(s) to " + servers + " server(s)");
       }
+
+      // invoke assignment (async)
+      ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
           for (HRegionInfo region: plan.getValue()) {
             if (!regionStates.isRegionOnline(region)) {
               invokeAssign(region);
+              if (!region.getTable().isSystemTable()) {
+                userRegionSet.add(region);
+              }
             }
           }
         }
       }
+
+      // wait for assignment completion
+      if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+            System.currentTimeMillis())) {
+        LOG.debug("some user regions are still in transition: " + userRegionSet);
+      }
     } else {
       LOG.info("Bulk assigning " + regions + " region(s) across "
         + totalServers + " server(s), " + message);

http://git-wip-us.apache.org/repos/asf/hbase/blob/30ecf990/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
index 356f4af..43ea523 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.RegionState.State;
 
 /**
  * Run bulk assign.  Does one RCP per regionserver passing a
@@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner {
     if (!failedPlans.isEmpty() && !server.isStopped()) {
       reassigningRegions = reassignFailedPlans();
     }
-
-    Configuration conf = server.getConfiguration();
-    long perRegionOpenTimeGuesstimate =
-      conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
-    long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
-      + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
-    RegionStates regionStates = assignmentManager.getRegionStates();
-    // We're not synchronizing on regionsInTransition now because we don't use any iterator.
-    while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
-      Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
-      while (regionInfoIterator.hasNext()) {
-        HRegionInfo hri = regionInfoIterator.next();
-        if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
-            State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
-          regionInfoIterator.remove();
-        }
-      }
-      if (!waitTillAllAssigned) {
-        // No need to wait, let assignment going on asynchronously
-        break;
-      }
-      if (!regionSet.isEmpty()) {
-        regionStates.waitForUpdate(100);
-      }
-    }
+    assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
+      reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
 
     if (LOG.isDebugEnabled()) {
       long elapsedTime = System.currentTimeMillis() - startTime;