You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/31 07:47:57 UTC

svn commit: r1344569 - /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java

Author: stack
Date: Thu May 31 05:47:56 2012
New Revision: 1344569

URL: http://svn.apache.org/viewvc?rev=1344569&view=rev
Log:
HBASE-5970 Improve the AssignmentManager#updateTimer and speed up handling opened event

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1344569&r1=1344568&r2=1344569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu May 31 05:47:56 2012
@@ -116,6 +116,8 @@ public class AssignmentManager extends Z
 
   private TimeoutMonitor timeoutMonitor;
 
+  private TimerUpdater timerUpdater;
+
   private LoadBalancer balancer;
 
   /**
@@ -161,6 +163,13 @@ public class AssignmentManager extends Z
     new TreeMap<ServerName, Set<HRegionInfo>>();
 
   /**
+   * Contains the server which need to update timer, these servers will be
+   * handled by {@link TimerUpdater}
+   */
+  private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer = 
+    new ConcurrentSkipListSet<ServerName>();
+
+  /**
    * Region to server assignment map.
    * Contains the server a given region is currently assigned to.
    * This Map and {@link #servers} are tied.  Always update this in tandem
@@ -218,6 +227,10 @@ public class AssignmentManager extends Z
       conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
       master, serverManager,
       conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
+    this.timerUpdater = new TimerUpdater(conf.getInt(
+        "hbase.master.assignment.timerupdater.period", 10000), master);
+    Threads.setDaemonThreadRunning(timerUpdater.getThread(),
+        master.getServerName() + ".timerUpdater");
     this.zkTable = new ZKTable(this.master.getZooKeeper());
     this.maximumAssignmentAttempts =
       this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
@@ -1225,8 +1238,17 @@ public class AssignmentManager extends Z
     }
     // Remove plan if one.
     clearRegionPlan(regionInfo);
-    // Update timers for all regions in transition going against this server.
-    updateTimers(sn);
+    // Add the server to serversInUpdatingTimer
+    addToServersInUpdatingTimer(sn);
+  }
+
+  /**
+   * Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
+   * will update timers for this server in background
+   * @param sn
+   */
+  private void addToServersInUpdatingTimer(final ServerName sn) {
+    this.serversInUpdatingTimer.add(sn);
   }
 
   /**
@@ -2906,6 +2928,35 @@ public class AssignmentManager extends Z
   }
 
   /**
+   * Update timers for all regions in transition going against the server in the
+   * serversInUpdatingTimer.
+   */
+  public class TimerUpdater extends Chore {
+
+    public TimerUpdater(final int period, final Stoppable stopper) {
+      super("AssignmentTimerUpdater", period, stopper);
+    }
+
+    @Override
+    protected void chore() {
+      ServerName serverToUpdateTimer = null;
+      while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
+        if (serverToUpdateTimer == null) {
+          serverToUpdateTimer = serversInUpdatingTimer.first();
+        } else {
+          serverToUpdateTimer = serversInUpdatingTimer
+              .higher(serverToUpdateTimer);
+        }
+        if (serverToUpdateTimer == null) {
+          break;
+        }
+        updateTimers(serverToUpdateTimer);
+        serversInUpdatingTimer.remove(serverToUpdateTimer);
+      }
+    }
+  }
+
+  /**
    * Monitor to check for time outs on region transition operations
    */
   public class TimeoutMonitor extends Chore {
@@ -3449,6 +3500,7 @@ public class AssignmentManager extends Z
 
   public void stop() {
     this.timeoutMonitor.interrupt();
+    this.timerUpdater.interrupt();
   }
   
   /**