You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:48:43 UTC
svn commit: r1156914 -
/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Author: esammer
Date: Fri Aug 12 00:48:43 2011
New Revision: 1156914
URL: http://svn.apache.org/viewvc?rev=1156914&view=rev
Log:
- Added a command processor exec service in which to process async commands in the node manager. - Use guava ThreadPoolBuilder to name thread pools for debugging.
Modified:
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1156914&r1=1156913&r2=1156914&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Aug 12 00:48:43 2011
@@ -1,5 +1,6 @@
package org.apache.flume.node.nodemanager;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -16,6 +17,8 @@ import org.apache.flume.node.NodeConfigu
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
implements NodeConfigurationAware {
@@ -25,6 +28,7 @@ public class DefaultLogicalNodeManager e
private SourceFactory sourceFactory;
private SinkFactory sinkFactory;
+ private ExecutorService commandProcessorService;
private ScheduledExecutorService monitorService;
private LifecycleState lifecycleState;
@@ -83,7 +87,14 @@ public class DefaultLogicalNodeManager e
statusMonitor.setNodeManager(this);
- monitorService = Executors.newScheduledThreadPool(1);
+ commandProcessorService = Executors
+ .newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
+ "nodeManager-commandProcessor-%d").build());
+ monitorService = Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder().setNameFormat(
+ "nodeManager-monitorService-%d").build());
+
monitorService.scheduleAtFixedRate(statusMonitor, 0, 3, TimeUnit.SECONDS);
for (LogicalNode node : getNodes()) {
@@ -131,6 +142,13 @@ public class DefaultLogicalNodeManager e
monitorService.awaitTermination(1, TimeUnit.SECONDS);
}
+ commandProcessorService.shutdown();
+
+ while (!commandProcessorService.isTerminated()) {
+ logger.debug("Waiting for command processor to shutdown");
+ commandProcessorService.awaitTermination(1, TimeUnit.SECONDS);
+ }
+
logger.debug("Node manager stopped");
lifecycleState = LifecycleState.STOP;