You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/07/07 00:30:15 UTC
svn commit: r1358459 - in /incubator/flume/branches/branch-1.2.0:
flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Author: mpercy
Date: Fri Jul 6 22:30:15 2012
New Revision: 1358459
URL: http://svn.apache.org/viewvc?rev=1358459&view=rev
Log:
FLUME-1331. Catch and log all Throwables on start.
(Hari Shreedharan via Mike Percy)
Modified:
incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1358459&r1=1358458&r2=1358459&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Fri Jul 6 22:30:15 2012
@@ -128,6 +128,7 @@ public class LifecycleSupervisor impleme
process.policy = policy;
process.status.desiredState = desiredState;
+ process.status.error = false;
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
@@ -183,6 +184,10 @@ public class LifecycleSupervisor impleme
return lifecycleState;
}
+ public synchronized boolean isComponentInErrorState(LifecycleAware component){
+ return supervisedProcesses.get(component).status.error;
+
+ }
public static class MonitorRunnable implements Runnable {
public ScheduledExecutorService monitorService;
@@ -208,6 +213,10 @@ public class LifecycleSupervisor impleme
// Unsupervise has already been called on this.
logger.info("Component has already been stopped {}", lifecycleAware);
return;
+ } else if(supervisoree.status.error) {
+ logger.info("Component {} is in error state, and Flume will not" +
+ "attempt to change its state", lifecycleAware);
+ return;
}
supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
@@ -226,18 +235,43 @@ public class LifecycleSupervisor impleme
case START:
try {
lifecycleAware.start();
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("Unable to start " + lifecycleAware
+ " - Exception follows.", e);
+ if(e instanceof Error){
+ //This component can never recover, shut it down.
+ supervisoree.status.desiredState = LifecycleState.STOP;
+ try{
+ lifecycleAware.stop();
+ logger.warn("Component {} stopped, since it could not be" +
+ "successfully started due to missing dependencies",
+ lifecycleAware);
+ } catch (Throwable e1) {
+ logger.error("Unsuccessful attempt to " +
+ "shutdown component: {} due to missing dependencies." +
+ " Please shutdown the agent" +
+ "or disable this component, or the agent will be" +
+ "in an undefined state.", e1);
+ supervisoree.status.error = true;
+ if(e1 instanceof Error){
+ throw (Error)e1;
+ }
+ //Set the state to stop, so that the conf poller can
+ //proceed.
+ }
+ }
supervisoree.status.failures++;
}
break;
case STOP:
try {
lifecycleAware.stop();
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("Unable to stop " + lifecycleAware
+ " - Exception follows.", e);
+ if(e instanceof Error) {
+ throw (Error)e;
+ }
supervisoree.status.failures++;
}
break;
@@ -277,12 +311,14 @@ public class LifecycleSupervisor impleme
public LifecycleState desiredState;
public int failures;
public boolean discard;
+ public volatile boolean error;
@Override
public String toString() {
return "{ lastSeen:" + lastSeen + " lastSeenState:" + lastSeenState
+ " desiredState:" + desiredState + " firstSeen:" + firstSeen
- + " failures:" + failures + " discard:" + discard + " }";
+ + " failures:" + failures + " discard:" + discard + " error:" +
+ error + " }";
}
}
@@ -321,4 +357,5 @@ public class LifecycleSupervisor impleme
}
+
}
Modified: incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1358459&r1=1358458&r2=1358459&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Jul 6 22:30:15 2012
@@ -108,7 +108,8 @@ public class DefaultLogicalNodeManager e
* Wait for all channels to start.
*/
for(Channel ch: nodeConfiguration.getChannels().values()){
- while(ch.getLifecycleState() != LifecycleState.START){
+ while(ch.getLifecycleState() != LifecycleState.START
+ && !nodeSupervisor.isComponentInErrorState(ch)){
try {
logger.info("Waiting for channel: " + ch.getName() +
" to start. Sleeping for 500 ms");