You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/30 07:36:11 UTC

svn commit: r1307278 - /incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Author: arvind
Date: Fri Mar 30 05:36:11 2012
New Revision: 1307278

URL: http://svn.apache.org/viewvc?rev=1307278&view=rev
Log:
FLUME-1079. Flume agent reconfiguration enters permanent bad state.

(Hari Shreedharan via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Modified: incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1307278&r1=1307277&r2=1307278&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Mar 30 05:36:11 2012
@@ -56,30 +56,44 @@ public class DefaultLogicalNodeManager e
     if (this.nodeConfiguration != null) {
       logger
           .info("Shutting down old configuration: {}", this.nodeConfiguration);
-      for (Entry<String, SinkRunner> entry : this.nodeConfiguration.getSinkRunners()
-          .entrySet()) {
-        nodeSupervisor.unsupervise(entry.getValue());
+      for (Entry<String, SinkRunner> entry :
+        this.nodeConfiguration.getSinkRunners().entrySet()) {
+        try{
+          nodeSupervisor.unsupervise(entry.getValue());
+        } catch (Exception e){
+          logger.error("Error while stopping {}", entry.getValue(), e);
+        }
       }
 
       for (Entry<String, SourceRunner> entry : this.nodeConfiguration
           .getSourceRunners().entrySet()) {
-        nodeSupervisor.unsupervise(entry.getValue());
+        try{
+          nodeSupervisor.unsupervise(entry.getValue());
+        } catch (Exception e){
+          logger.error("Error while stopping {}", entry.getValue(), e);
+        }
       }
     }
 
     this.nodeConfiguration = nodeConfiguration;
     for (Entry<String, SinkRunner> entry : nodeConfiguration.getSinkRunners()
         .entrySet()) {
-
-      nodeSupervisor.supervise(entry.getValue(),
+      try{
+        nodeSupervisor.supervise(entry.getValue(),
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+      } catch (Exception e) {
+        logger.error("Error while starting {}", entry.getValue(), e);
+      }
     }
 
     for (Entry<String, SourceRunner> entry : nodeConfiguration
         .getSourceRunners().entrySet()) {
-
-      nodeSupervisor.supervise(entry.getValue(),
+      try{
+        nodeSupervisor.supervise(entry.getValue(),
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+      } catch (Exception e) {
+        logger.error("Error while starting {}", entry.getValue(), e);
+      }
     }
   }