You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/07/07 00:30:15 UTC

svn commit: r1358459 - in /incubator/flume/branches/branch-1.2.0: flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Author: mpercy
Date: Fri Jul  6 22:30:15 2012
New Revision: 1358459

URL: http://svn.apache.org/viewvc?rev=1358459&view=rev
Log:
FLUME-1331. Catch and log all Throwables on start.

(Hari Shreedharan via Mike Percy)

Modified:
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
    incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java?rev=1358459&r1=1358458&r2=1358459&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java Fri Jul  6 22:30:15 2012
@@ -128,6 +128,7 @@ public class LifecycleSupervisor impleme
 
     process.policy = policy;
     process.status.desiredState = desiredState;
+    process.status.error = false;
 
     MonitorRunnable monitorRunnable = new MonitorRunnable();
     monitorRunnable.lifecycleAware = lifecycleAware;
@@ -183,6 +184,10 @@ public class LifecycleSupervisor impleme
     return lifecycleState;
   }
 
+  public synchronized boolean isComponentInErrorState(LifecycleAware component){
+    return supervisedProcesses.get(component).status.error;
+
+  }
   public static class MonitorRunnable implements Runnable {
 
     public ScheduledExecutorService monitorService;
@@ -208,6 +213,10 @@ public class LifecycleSupervisor impleme
           // Unsupervise has already been called on this.
           logger.info("Component has already been stopped {}", lifecycleAware);
           return;
+        } else if(supervisoree.status.error) {
+          logger.info("Component {} is in error state, and Flume will not" +
+              "attempt to change its state", lifecycleAware);
+          return;
         }
 
       supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
@@ -226,18 +235,43 @@ public class LifecycleSupervisor impleme
         case START:
           try {
             lifecycleAware.start();
-          } catch (Exception e) {
+          } catch (Throwable e) {
             logger.error("Unable to start " + lifecycleAware
                 + " - Exception follows.", e);
+            if(e instanceof Error){
+              //This component can never recover, shut it down.
+              supervisoree.status.desiredState = LifecycleState.STOP;
+              try{
+                lifecycleAware.stop();
+                logger.warn("Component {} stopped, since it could not be" +
+                    "successfully started due to missing dependencies",
+                    lifecycleAware);
+              } catch (Throwable e1) {
+                logger.error("Unsuccessful attempt to " +
+                    "shutdown component: {} due to missing dependencies." +
+                    " Please shutdown the agent" +
+                    "or disable this component, or the agent will be" +
+                    "in an undefined state.", e1);
+                supervisoree.status.error = true;
+                if(e1 instanceof Error){
+                  throw (Error)e1;
+                }
+                //Set the state to stop, so that the conf poller can
+                //proceed.
+              }
+            }
             supervisoree.status.failures++;
           }
           break;
         case STOP:
           try {
             lifecycleAware.stop();
-          } catch (Exception e) {
+          } catch (Throwable e) {
             logger.error("Unable to stop " + lifecycleAware
                 + " - Exception follows.", e);
+            if(e instanceof Error) {
+              throw (Error)e;
+            }
             supervisoree.status.failures++;
           }
           break;
@@ -277,12 +311,14 @@ public class LifecycleSupervisor impleme
     public LifecycleState desiredState;
     public int failures;
     public boolean discard;
+    public volatile boolean error;
 
     @Override
     public String toString() {
       return "{ lastSeen:" + lastSeen + " lastSeenState:" + lastSeenState
           + " desiredState:" + desiredState + " firstSeen:" + firstSeen
-          + " failures:" + failures + " discard:" + discard + " }";
+          + " failures:" + failures + " discard:" + discard + " error:" +
+          error + " }";
     }
 
   }
@@ -321,4 +357,5 @@ public class LifecycleSupervisor impleme
 
   }
 
+
 }

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1358459&r1=1358458&r2=1358459&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Jul  6 22:30:15 2012
@@ -108,7 +108,8 @@ public class DefaultLogicalNodeManager e
      * Wait for all channels to start.
      */
     for(Channel ch: nodeConfiguration.getChannels().values()){
-      while(ch.getLifecycleState() != LifecycleState.START){
+      while(ch.getLifecycleState() != LifecycleState.START
+          && !nodeSupervisor.isComponentInErrorState(ch)){
         try {
           logger.info("Waiting for channel: " + ch.getName() +
               " to start. Sleeping for 500 ms");