You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/19 02:59:49 UTC

svn commit: r986993 - in /hbase/branches/0.90_master_rewrite/src: main/java/org/apache/hadoop/hbase/executor/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/master/handler/ test/java/org/apache/hadoop/hbase/master/

Author: stack
Date: Thu Aug 19 00:59:48 2010
New Revision: 986993

URL: http://svn.apache.org/viewvc?rev=986993&view=rev
Log:
Made listeners for EventHandlers non-static

M src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java
  Working on fixing this test -- not done yet.
M src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
  Fixup on logging and bug fix (setting state into RegionState when handling a close).
M src/main/java/org/apache/hadoop/hbase/master/HMaster.java
  Make excecutor service and assignment manager package private so can 
  be gotten at by tests.
M src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
  Formatting.
M src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
  Redid listener mechanism so it didn't rely on statics to work.
M src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
  Added support for listeners.

Modified:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java Thu Aug 19 00:59:48 2010
@@ -19,9 +19,6 @@
  */
 package org.apache.hadoop.hbase.executor;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -50,14 +47,15 @@ public abstract class EventHandler imple
   protected EventType eventType;
   // server controller
   protected Server server;
-  // listeners that are called before and after an event is processed
-  protected static List<EventHandlerListener> eventHandlerListeners =
-    Collections.synchronizedList(new ArrayList<EventHandlerListener>());
+
   // sequence id generator for default FIFO ordering of events
   protected static AtomicLong seqids = new AtomicLong(0);
   // sequence id for this event
   protected long seqid;
 
+  // Listener to call pre- and post- processing.
+  private EventHandlerListener listener;
+
   /**
    * This interface provides hooks to listen to various events received by the
    * queue. A class implementing this can listen to the updates by calling
@@ -188,28 +186,17 @@ public abstract class EventHandler imple
   }
 
   /**
-   * This is a wrapper around process, used to update listeners before and after
-   * events are processed.
+   * This is a wrapper around {@link #process()} to give listeners a chance to run.
    */
   public void run() {
-    // fire all beforeProcess listeners
-    for(EventHandlerListener listener : eventHandlerListeners) {
-      listener.beforeProcess(this);
-    }
-
+    if (getListener() != null) this.listener.beforeProcess(this);
     // call the main process function
     try {
       process();
     } catch(Throwable t) {
       LOG.error("Caught throwable while processing event " + eventType, t);
     }
-
-    // fire all afterProcess listeners
-    for(EventHandlerListener listener : eventHandlerListeners) {
-      LOG.debug("Firing " + listener.getClass().getName() +
-                ".afterProcess event listener for event " + eventType);
-      listener.afterProcess(this);
-    }
+    if (getListener() != null) this.listener.afterProcess(this);
   }
 
   /**
@@ -219,20 +206,6 @@ public abstract class EventHandler imple
   public abstract void process();
 
   /**
-   * Subscribe to updates before and after processing events
-   */
-  public static void registerListener(EventHandlerListener listener) {
-    eventHandlerListeners.add(listener);
-  }
-
-  /**
-   * Stop receiving updates before and after processing events
-   */
-  public static void unregisterListener(EventHandlerListener listener) {
-    eventHandlerListeners.remove(listener);
-  }
-
-  /**
    * Return the name for this event type.
    * @return
    */
@@ -286,4 +259,18 @@ public abstract class EventHandler imple
   public void execute() {
     this.run();
   }
-}
+
+  /**
+   * @return Current listener or null if none set.
+   */
+  public synchronized EventHandlerListener getListener() {
+    return listener;
+  }
+
+  /**
+   * @param listener Listener to call pre- and post- {@link #process()}.
+   */
+  public synchronized void setListener(EventHandlerListener listener) {
+    this.listener = listener;
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java Thu Aug 19 00:59:48 2010
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.executor;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,21 +31,26 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * This is a generic executor service. This component abstracts a
- * threadpool, a queue to which jobs can be submitted and a Runnable that
- * handles the object that is added to the queue.
+ * threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
+ * and a <code>Runnable</code> that handles the object that is added to the queue.
  *
  * <p>In order to create a new service, create an instance of this class and 
  * then do: <code>instance.startExecutorService("myService");</code>.  When done
  * call {@link #shutdown()}.
  *
  * In order to use the service created above, you need to override the
- * <code>EventHandler</code> class and create an event type that submits to this
- * service.
+ * {@link EventHandler} class and create an {@link EventHandler.EventType} that
+ * submits to this service.  Register pre- and post- processing listeners
+ * by registering your implementation of {@link EventHandler.EventHandlerListener}
+ * with {@link #registerListener(EventType, EventHandlerListener)}.  Be sure
+ * to deregister your listener when done via {@link #unregisterListener(EventType)}.
  */
 public class ExecutorService {
   private static final Log LOG = LogFactory.getLog(ExecutorService.class);
@@ -52,6 +58,9 @@ public class ExecutorService {
   // hold the all the executors created in a map addressable by their names
   private final ConcurrentHashMap<String, Executor> executorMap =
     new ConcurrentHashMap<String, Executor>();
+  // listeners that are called before and after an event is processed
+  private ConcurrentHashMap<EventHandler.EventType, EventHandlerListener> eventHandlerListeners =
+    new ConcurrentHashMap<EventHandler.EventType, EventHandlerListener>();
 
   private final String servername;
 
@@ -102,7 +111,7 @@ public class ExecutorService {
       throw new RuntimeException("An executor service with the name " + name +
         " is already running!");
     }
-    Executor hbes = new Executor(name, maxThreads);
+    Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
     if (this.executorMap.putIfAbsent(name, hbes) != null) {
       throw new RuntimeException("An executor service with the name " + name +
       " is already running (2)!");
@@ -153,6 +162,29 @@ public class ExecutorService {
   }
 
   /**
+   * Subscribe to updates before and after processing instances of
+   * {@link EventHandler.EventType}.  Currently only one listener per
+   * event type.
+   * @param type Type of event we're registering listener for
+   * @param listener The listener to run.
+   * @return The <code>listener</code> that was passed
+   */
+  public void registerListener(final EventHandler.EventType type,
+      final EventHandlerListener listener) {
+    this.eventHandlerListeners.put(type, listener);
+  }
+
+  /**
+   * Stop receiving updates before and after processing instances of
+   * {@link EventHandler.EventType}
+   * @param type Type of event we're registering listener for
+   * @return The listener we removed or null if we did not remove it.
+   */
+  public EventHandlerListener unregisterListener(final EventHandler.EventType type) {
+    return this.eventHandlerListeners.remove(type);
+  }
+
+  /**
    * Executor instance.
    */
   private static class Executor {
@@ -166,9 +198,12 @@ public class ExecutorService {
     BlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<Runnable>();
     private final AtomicInteger threadid = new AtomicInteger(0);
     private final String name;
+    private final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners;
 
-    protected Executor(String name, int maxThreads) {
+    protected Executor(String name, int maxThreads,
+        final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners) {
       this.name = name;
+      this.eventHandlerListeners = eventHandlerListeners;
       // create the thread pool executor
       this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads,
           keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue);
@@ -182,7 +217,14 @@ public class ExecutorService {
      * Submit the event to the queue for handling.
      * @param event
      */
-    void submit(Runnable event) {
+    void submit(final EventHandler event) {
+      // If there is a listener for this type, make sure we call the before
+      // and after process methods.
+      EventHandlerListener listener =
+        this.eventHandlerListeners.get(event.getEventType());
+      if (listener != null) {
+        event.setListener(listener);
+      }
       this.threadPoolExecutor.execute(event);
     }
   }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Aug 19 00:59:48 2010
@@ -239,15 +239,15 @@ public class AssignmentManager extends Z
       }
       String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
       String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
-      LOG.debug("Handling region transition for server " +
-        data.getServerName() + " and region " + prettyPrintedRegionName);
+      LOG.debug("Handling transition=" + data.getEventType() + ", server=" +
+        data.getServerName() + ", region=" + prettyPrintedRegionName);
       RegionState regionState = regionsInTransition.get(encodedName);
       switch(data.getEventType()) {
 
         case RS2ZK_REGION_CLOSING:
           // Should see CLOSING after we have asked it to CLOSE or additional
           // times after already being in state of CLOSING
-          if(regionState == null ||
+          if (regionState == null ||
               (!regionState.isPendingClose() && !regionState.isClosing())) {
             LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
                 " from server " + data.getServerName() + " but region was in " +
@@ -261,7 +261,7 @@ public class AssignmentManager extends Z
 
         case RS2ZK_REGION_CLOSED:
           // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
-          if(regionState == null ||
+          if (regionState == null ||
               (!regionState.isPendingClose() && !regionState.isClosing())) {
             LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
                 " from server " + data.getServerName() + " but region was in " +
@@ -270,6 +270,9 @@ public class AssignmentManager extends Z
             return;
           }
           // Handle CLOSED by assigning elsewhere or stopping if a disable
+          // If we got here all is good.  Need to update RegionState -- else
+          // what follows will fail because not in expected state.
+          regionState.update(RegionState.State.CLOSED, data.getStamp());
           this.executorService.submit(new ClosedRegionHandler(master,
             this, data, regionState.getRegion()));
           break;

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Aug 19 00:59:48 2010
@@ -141,7 +141,7 @@ implements HMasterInterface, HMasterRegi
   private final ServerManager serverManager;
 
   // manager of assignment nodes in zookeeper
-  private final AssignmentManager assignmentManager;
+  final AssignmentManager assignmentManager;
   // manager of catalog regions
   private final CatalogTracker catalogTracker;
   // Cluster status zk tracker and local setter
@@ -156,7 +156,7 @@ implements HMasterInterface, HMasterRegi
   private volatile boolean abort = false;
 
   // Instance of the hbase executor service.
-  private ExecutorService executorService;
+  ExecutorService executorService;
 
   /**
    * Initializes the HMaster. The steps are as follows:

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java Thu Aug 19 00:59:48 2010
@@ -84,7 +84,7 @@ public class ClosedRegionHandler extends
   public void process() {
     LOG.debug("Handling CLOSED event with data: " + data);
     // Check if this table is being disabled or not
-    if(assignmentManager.isTableOfRegionDisabled(regionInfo.getRegionName())) {
+    if (assignmentManager.isTableOfRegionDisabled(regionInfo.getRegionName())) {
       // Disabling so should not be reassigned, just delete the CLOSED node
       LOG.debug("Table being disabled so deleting ZK node and removing from " +
           "regions in transition, skipping assignment");
@@ -101,7 +101,7 @@ public class ClosedRegionHandler extends
       assignmentManager.regionOffline(regionInfo);
       return;
     }
-    // ZK Node is in CLOSED state, assign it (transition to OFFLINE done here)
+    // ZK Node is in CLOSED state, assign it.
     assignmentManager.setOffline(regionInfo);
     assignmentManager.assign(regionInfo);
   }

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java Thu Aug 19 00:59:48 2010
@@ -88,10 +88,12 @@ public class TestZKBasedReopenRegion {
   @Test (timeout=300000) public void testOpenRegion()
   throws Exception {
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-    LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size());
+    LOG.info("Number of region servers = " +
+      cluster.getLiveRegionServerThreads().size());
 
     int rsIdx = 0;
-    HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
+    HRegionServer regionServer =
+      TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
     Collection<HRegion> regions = regionServer.getOnlineRegions();
     HRegion region;
     while((region = regions.iterator().next()) != null) {
@@ -106,26 +108,31 @@ public class TestZKBasedReopenRegion {
 
     EventHandlerListener closeListener =
       new RegionEventListener(region.getRegionNameAsString(),
-          closeEventProcessed, EventType.M2RS_CLOSE_REGION);
-    EventHandler.registerListener(closeListener);
+          closeEventProcessed, EventType.RS2ZK_REGION_CLOSED);
+    cluster.getMaster().executorService.
+      registerListener(EventType.RS2ZK_REGION_CLOSED, closeListener);
 
     EventHandlerListener openListener =
       new RegionEventListener(region.getRegionNameAsString(),
-          reopenEventProcessed, EventType.M2RS_OPEN_REGION);
-    EventHandler.registerListener(openListener);
+          reopenEventProcessed, EventType.RS2ZK_REGION_OPENED);
+    cluster.getMaster().executorService.
+      registerListener(EventType.RS2ZK_REGION_OPENED, openListener);
 
-    regionServer.closeRegion(region.getRegionInfo());
+    LOG.info("Unassign " + region.getRegionNameAsString());
+    cluster.getMaster().assignmentManager.unassign(region.getRegionInfo());
 
     synchronized(closeEventProcessed) {
       closeEventProcessed.wait(3*60*1000);
     }
-    if(!closeEventProcessed.get()) {
+
+    if (!closeEventProcessed.get()) {
       throw new Exception("Timed out, close event not called on master.");
     }
 
     synchronized(reopenEventProcessed) {
       reopenEventProcessed.wait(3*60*1000);
     }
+
     if(!reopenEventProcessed.get()) {
       throw new Exception("Timed out, open event not called on master after region close.");
     }