You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/30 07:36:11 UTC
svn commit: r1307278 -
/incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Author: arvind
Date: Fri Mar 30 05:36:11 2012
New Revision: 1307278
URL: http://svn.apache.org/viewvc?rev=1307278&view=rev
Log:
FLUME-1079. Flume agent reconfiguration enters permanent bad state.
(Hari Shreedharan via Arvind Prabhakar)
Modified:
incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Modified: incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1307278&r1=1307277&r2=1307278&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Mar 30 05:36:11 2012
@@ -56,30 +56,44 @@ public class DefaultLogicalNodeManager e
if (this.nodeConfiguration != null) {
logger
.info("Shutting down old configuration: {}", this.nodeConfiguration);
- for (Entry<String, SinkRunner> entry : this.nodeConfiguration.getSinkRunners()
- .entrySet()) {
- nodeSupervisor.unsupervise(entry.getValue());
+ for (Entry<String, SinkRunner> entry :
+ this.nodeConfiguration.getSinkRunners().entrySet()) {
+ try{
+ nodeSupervisor.unsupervise(entry.getValue());
+ } catch (Exception e){
+ logger.error("Error while stopping {}", entry.getValue(), e);
+ }
}
for (Entry<String, SourceRunner> entry : this.nodeConfiguration
.getSourceRunners().entrySet()) {
- nodeSupervisor.unsupervise(entry.getValue());
+ try{
+ nodeSupervisor.unsupervise(entry.getValue());
+ } catch (Exception e){
+ logger.error("Error while stopping {}", entry.getValue(), e);
+ }
}
}
this.nodeConfiguration = nodeConfiguration;
for (Entry<String, SinkRunner> entry : nodeConfiguration.getSinkRunners()
.entrySet()) {
-
- nodeSupervisor.supervise(entry.getValue(),
+ try{
+ nodeSupervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
}
for (Entry<String, SourceRunner> entry : nodeConfiguration
.getSourceRunners().entrySet()) {
-
- nodeSupervisor.supervise(entry.getValue(),
+ try{
+ nodeSupervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
}
}