You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/19 02:59:49 UTC
svn commit: r986993 - in /hbase/branches/0.90_master_rewrite/src:
main/java/org/apache/hadoop/hbase/executor/
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/master/handler/
test/java/org/apache/hadoop/hbase/master/
Author: stack
Date: Thu Aug 19 00:59:48 2010
New Revision: 986993
URL: http://svn.apache.org/viewvc?rev=986993&view=rev
Log:
Made listeners for EventHandlers non-static
M src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java
Working on fixing this test -- not done yet.
M src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
Fixup on logging and bug fix (setting state into RegionState when handling a close).
M src/main/java/org/apache/hadoop/hbase/master/HMaster.java
Make excecutor service and assignment manager package private so can
be gotten at by tests.
M src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
Formatting.
M src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
Redid listener mechanism so it didn't rely on statics to work.
M src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
Added support for listeners.
Modified:
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java Thu Aug 19 00:59:48 2010
@@ -19,9 +19,6 @@
*/
package org.apache.hadoop.hbase.executor;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -50,14 +47,15 @@ public abstract class EventHandler imple
protected EventType eventType;
// server controller
protected Server server;
- // listeners that are called before and after an event is processed
- protected static List<EventHandlerListener> eventHandlerListeners =
- Collections.synchronizedList(new ArrayList<EventHandlerListener>());
+
// sequence id generator for default FIFO ordering of events
protected static AtomicLong seqids = new AtomicLong(0);
// sequence id for this event
protected long seqid;
+ // Listener to call pre- and post- processing.
+ private EventHandlerListener listener;
+
/**
* This interface provides hooks to listen to various events received by the
* queue. A class implementing this can listen to the updates by calling
@@ -188,28 +186,17 @@ public abstract class EventHandler imple
}
/**
- * This is a wrapper around process, used to update listeners before and after
- * events are processed.
+ * This is a wrapper around {@link #process()} to give listeners a chance to run.
*/
public void run() {
- // fire all beforeProcess listeners
- for(EventHandlerListener listener : eventHandlerListeners) {
- listener.beforeProcess(this);
- }
-
+ if (getListener() != null) this.listener.beforeProcess(this);
// call the main process function
try {
process();
} catch(Throwable t) {
LOG.error("Caught throwable while processing event " + eventType, t);
}
-
- // fire all afterProcess listeners
- for(EventHandlerListener listener : eventHandlerListeners) {
- LOG.debug("Firing " + listener.getClass().getName() +
- ".afterProcess event listener for event " + eventType);
- listener.afterProcess(this);
- }
+ if (getListener() != null) this.listener.afterProcess(this);
}
/**
@@ -219,20 +206,6 @@ public abstract class EventHandler imple
public abstract void process();
/**
- * Subscribe to updates before and after processing events
- */
- public static void registerListener(EventHandlerListener listener) {
- eventHandlerListeners.add(listener);
- }
-
- /**
- * Stop receiving updates before and after processing events
- */
- public static void unregisterListener(EventHandlerListener listener) {
- eventHandlerListeners.remove(listener);
- }
-
- /**
* Return the name for this event type.
* @return
*/
@@ -286,4 +259,18 @@ public abstract class EventHandler imple
public void execute() {
this.run();
}
-}
+
+ /**
+ * @return Current listener or null if none set.
+ */
+ public synchronized EventHandlerListener getListener() {
+ return listener;
+ }
+
+ /**
+ * @param listener Listener to call pre- and post- {@link #process()}.
+ */
+ public synchronized void setListener(EventHandlerListener listener) {
+ this.listener = listener;
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java Thu Aug 19 00:59:48 2010
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.executor;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,21 +31,26 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This is a generic executor service. This component abstracts a
- * threadpool, a queue to which jobs can be submitted and a Runnable that
- * handles the object that is added to the queue.
+ * threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
+ * and a <code>Runnable</code> that handles the object that is added to the queue.
*
* <p>In order to create a new service, create an instance of this class and
* then do: <code>instance.startExecutorService("myService");</code>. When done
* call {@link #shutdown()}.
*
* In order to use the service created above, you need to override the
- * <code>EventHandler</code> class and create an event type that submits to this
- * service.
+ * {@link EventHandler} class and create an {@link EventHandler.EventType} that
+ * submits to this service. Register pre- and post- processing listeners
+ * by registering your implementation of {@link EventHandler.EventHandlerListener}
+ * with {@link #registerListener(EventType, EventHandlerListener)}. Be sure
+ * to deregister your listener when done via {@link #unregisterListener(EventType)}.
*/
public class ExecutorService {
private static final Log LOG = LogFactory.getLog(ExecutorService.class);
@@ -52,6 +58,9 @@ public class ExecutorService {
// hold the all the executors created in a map addressable by their names
private final ConcurrentHashMap<String, Executor> executorMap =
new ConcurrentHashMap<String, Executor>();
+ // listeners that are called before and after an event is processed
+ private ConcurrentHashMap<EventHandler.EventType, EventHandlerListener> eventHandlerListeners =
+ new ConcurrentHashMap<EventHandler.EventType, EventHandlerListener>();
private final String servername;
@@ -102,7 +111,7 @@ public class ExecutorService {
throw new RuntimeException("An executor service with the name " + name +
" is already running!");
}
- Executor hbes = new Executor(name, maxThreads);
+ Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
if (this.executorMap.putIfAbsent(name, hbes) != null) {
throw new RuntimeException("An executor service with the name " + name +
" is already running (2)!");
@@ -153,6 +162,29 @@ public class ExecutorService {
}
/**
+ * Subscribe to updates before and after processing instances of
+ * {@link EventHandler.EventType}. Currently only one listener per
+ * event type.
+ * @param type Type of event we're registering listener for
+ * @param listener The listener to run.
+ * @return The <code>listener</code> that was passed
+ */
+ public void registerListener(final EventHandler.EventType type,
+ final EventHandlerListener listener) {
+ this.eventHandlerListeners.put(type, listener);
+ }
+
+ /**
+ * Stop receiving updates before and after processing instances of
+ * {@link EventHandler.EventType}
+ * @param type Type of event we're registering listener for
+ * @return The listener we removed or null if we did not remove it.
+ */
+ public EventHandlerListener unregisterListener(final EventHandler.EventType type) {
+ return this.eventHandlerListeners.remove(type);
+ }
+
+ /**
* Executor instance.
*/
private static class Executor {
@@ -166,9 +198,12 @@ public class ExecutorService {
BlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<Runnable>();
private final AtomicInteger threadid = new AtomicInteger(0);
private final String name;
+ private final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners;
- protected Executor(String name, int maxThreads) {
+ protected Executor(String name, int maxThreads,
+ final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners) {
this.name = name;
+ this.eventHandlerListeners = eventHandlerListeners;
// create the thread pool executor
this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads,
keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue);
@@ -182,7 +217,14 @@ public class ExecutorService {
* Submit the event to the queue for handling.
* @param event
*/
- void submit(Runnable event) {
+ void submit(final EventHandler event) {
+ // If there is a listener for this type, make sure we call the before
+ // and after process methods.
+ EventHandlerListener listener =
+ this.eventHandlerListeners.get(event.getEventType());
+ if (listener != null) {
+ event.setListener(listener);
+ }
this.threadPoolExecutor.execute(event);
}
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Aug 19 00:59:48 2010
@@ -239,15 +239,15 @@ public class AssignmentManager extends Z
}
String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
- LOG.debug("Handling region transition for server " +
- data.getServerName() + " and region " + prettyPrintedRegionName);
+ LOG.debug("Handling transition=" + data.getEventType() + ", server=" +
+ data.getServerName() + ", region=" + prettyPrintedRegionName);
RegionState regionState = regionsInTransition.get(encodedName);
switch(data.getEventType()) {
case RS2ZK_REGION_CLOSING:
// Should see CLOSING after we have asked it to CLOSE or additional
// times after already being in state of CLOSING
- if(regionState == null ||
+ if (regionState == null ||
(!regionState.isPendingClose() && !regionState.isClosing())) {
LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
" from server " + data.getServerName() + " but region was in " +
@@ -261,7 +261,7 @@ public class AssignmentManager extends Z
case RS2ZK_REGION_CLOSED:
// Should see CLOSED after CLOSING but possible after PENDING_CLOSE
- if(regionState == null ||
+ if (regionState == null ||
(!regionState.isPendingClose() && !regionState.isClosing())) {
LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
" from server " + data.getServerName() + " but region was in " +
@@ -270,6 +270,9 @@ public class AssignmentManager extends Z
return;
}
// Handle CLOSED by assigning elsewhere or stopping if a disable
+ // If we got here all is good. Need to update RegionState -- else
+ // what follows will fail because not in expected state.
+ regionState.update(RegionState.State.CLOSED, data.getStamp());
this.executorService.submit(new ClosedRegionHandler(master,
this, data, regionState.getRegion()));
break;
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Aug 19 00:59:48 2010
@@ -141,7 +141,7 @@ implements HMasterInterface, HMasterRegi
private final ServerManager serverManager;
// manager of assignment nodes in zookeeper
- private final AssignmentManager assignmentManager;
+ final AssignmentManager assignmentManager;
// manager of catalog regions
private final CatalogTracker catalogTracker;
// Cluster status zk tracker and local setter
@@ -156,7 +156,7 @@ implements HMasterInterface, HMasterRegi
private volatile boolean abort = false;
// Instance of the hbase executor service.
- private ExecutorService executorService;
+ ExecutorService executorService;
/**
* Initializes the HMaster. The steps are as follows:
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java Thu Aug 19 00:59:48 2010
@@ -84,7 +84,7 @@ public class ClosedRegionHandler extends
public void process() {
LOG.debug("Handling CLOSED event with data: " + data);
// Check if this table is being disabled or not
- if(assignmentManager.isTableOfRegionDisabled(regionInfo.getRegionName())) {
+ if (assignmentManager.isTableOfRegionDisabled(regionInfo.getRegionName())) {
// Disabling so should not be reassigned, just delete the CLOSED node
LOG.debug("Table being disabled so deleting ZK node and removing from " +
"regions in transition, skipping assignment");
@@ -101,7 +101,7 @@ public class ClosedRegionHandler extends
assignmentManager.regionOffline(regionInfo);
return;
}
- // ZK Node is in CLOSED state, assign it (transition to OFFLINE done here)
+ // ZK Node is in CLOSED state, assign it.
assignmentManager.setOffline(regionInfo);
assignmentManager.assign(regionInfo);
}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java?rev=986993&r1=986992&r2=986993&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java Thu Aug 19 00:59:48 2010
@@ -88,10 +88,12 @@ public class TestZKBasedReopenRegion {
@Test (timeout=300000) public void testOpenRegion()
throws Exception {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
- LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size());
+ LOG.info("Number of region servers = " +
+ cluster.getLiveRegionServerThreads().size());
int rsIdx = 0;
- HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
+ HRegionServer regionServer =
+ TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
Collection<HRegion> regions = regionServer.getOnlineRegions();
HRegion region;
while((region = regions.iterator().next()) != null) {
@@ -106,26 +108,31 @@ public class TestZKBasedReopenRegion {
EventHandlerListener closeListener =
new RegionEventListener(region.getRegionNameAsString(),
- closeEventProcessed, EventType.M2RS_CLOSE_REGION);
- EventHandler.registerListener(closeListener);
+ closeEventProcessed, EventType.RS2ZK_REGION_CLOSED);
+ cluster.getMaster().executorService.
+ registerListener(EventType.RS2ZK_REGION_CLOSED, closeListener);
EventHandlerListener openListener =
new RegionEventListener(region.getRegionNameAsString(),
- reopenEventProcessed, EventType.M2RS_OPEN_REGION);
- EventHandler.registerListener(openListener);
+ reopenEventProcessed, EventType.RS2ZK_REGION_OPENED);
+ cluster.getMaster().executorService.
+ registerListener(EventType.RS2ZK_REGION_OPENED, openListener);
- regionServer.closeRegion(region.getRegionInfo());
+ LOG.info("Unassign " + region.getRegionNameAsString());
+ cluster.getMaster().assignmentManager.unassign(region.getRegionInfo());
synchronized(closeEventProcessed) {
closeEventProcessed.wait(3*60*1000);
}
- if(!closeEventProcessed.get()) {
+
+ if (!closeEventProcessed.get()) {
throw new Exception("Timed out, close event not called on master.");
}
synchronized(reopenEventProcessed) {
reopenEventProcessed.wait(3*60*1000);
}
+
if(!reopenEventProcessed.get()) {
throw new Exception("Timed out, open event not called on master after region close.");
}