You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:48:43 UTC

svn commit: r1156914 - /incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Author: esammer
Date: Fri Aug 12 00:48:43 2011
New Revision: 1156914

URL: http://svn.apache.org/viewvc?rev=1156914&view=rev
Log:
- Added a command processor exec service in which to process async commands in the node manager. - Use guava ThreadPoolBuilder to name thread pools for debugging.

Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1156914&r1=1156913&r2=1156914&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Aug 12 00:48:43 2011
@@ -1,5 +1,6 @@
 package org.apache.flume.node.nodemanager;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -16,6 +17,8 @@ import org.apache.flume.node.NodeConfigu
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
     implements NodeConfigurationAware {
 
@@ -25,6 +28,7 @@ public class DefaultLogicalNodeManager e
   private SourceFactory sourceFactory;
   private SinkFactory sinkFactory;
 
+  private ExecutorService commandProcessorService;
   private ScheduledExecutorService monitorService;
   private LifecycleState lifecycleState;
 
@@ -83,7 +87,14 @@ public class DefaultLogicalNodeManager e
 
     statusMonitor.setNodeManager(this);
 
-    monitorService = Executors.newScheduledThreadPool(1);
+    commandProcessorService = Executors
+        .newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
+            "nodeManager-commandProcessor-%d").build());
+    monitorService = Executors.newScheduledThreadPool(
+        1,
+        new ThreadFactoryBuilder().setNameFormat(
+            "nodeManager-monitorService-%d").build());
+
     monitorService.scheduleAtFixedRate(statusMonitor, 0, 3, TimeUnit.SECONDS);
 
     for (LogicalNode node : getNodes()) {
@@ -131,6 +142,13 @@ public class DefaultLogicalNodeManager e
       monitorService.awaitTermination(1, TimeUnit.SECONDS);
     }
 
+    commandProcessorService.shutdown();
+
+    while (!commandProcessorService.isTerminated()) {
+      logger.debug("Waiting for command processor to shutdown");
+      commandProcessorService.awaitTermination(1, TimeUnit.SECONDS);
+    }
+
     logger.debug("Node manager stopped");
 
     lifecycleState = LifecycleState.STOP;