You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/07/31 06:00:50 UTC

git commit: FLUME-1408. Log uncaught Throwables thrown within Executors.

Updated Branches:
  refs/heads/trunk 5c3d966a7 -> a0a55703c


FLUME-1408. Log uncaught Throwables thrown within Executors.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a0a55703
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a0a55703
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a0a55703

Branch: refs/heads/trunk
Commit: a0a55703c875b79f7851fbc91ab22c96b27b6162
Parents: 5c3d966
Author: Hari <hs...@apache.org>
Authored: Mon Jul 30 20:58:57 2012 -0700
Committer: Hari <hs...@apache.org>
Committed: Mon Jul 30 20:58:57 2012 -0700

----------------------------------------------------------------------
 .../flume/instrumentation/GangliaServer.java       |   73 ++++----
 .../flume/lifecycle/LifecycleSupervisor.java       |  148 +++++++-------
 .../file/AbstractFileConfigurationProvider.java    |    3 +
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   10 +-
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |    1 -
 5 files changed, 123 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
index 1104141..d93cd33 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
@@ -37,7 +37,6 @@ import javax.management.AttributeList;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectInstance;
-import javax.management.ObjectName;
 import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.api.HostInfo;
@@ -332,45 +331,49 @@ public class GangliaServer implements MonitorService {
 
     @Override
     public void run() {
-      Set<ObjectInstance> queryMBeans = null;
       try {
-        queryMBeans = mbeanServer.queryMBeans(
-                null, null);
-      } catch (Exception ex) {
-        logger.error("Could not get Mbeans for monitoring", ex);
-        Throwables.propagate(ex);
-      }
-      for (ObjectInstance obj : queryMBeans) {
+        Set<ObjectInstance> queryMBeans = null;
         try {
-          if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
-            continue;
-          }
-          MBeanAttributeInfo[] attrs = mbeanServer.
-                  getMBeanInfo(obj.getObjectName()).getAttributes();
-          String strAtts[] = new String[attrs.length];
-          for (int i = 0; i < strAtts.length; i++) {
-            strAtts[i] = attrs[i].getName();
-          }
-          AttributeList attrList = mbeanServer.getAttributes(
-                  obj.getObjectName(), strAtts);
-          String component = obj.getObjectName().toString().substring(
-              obj.getObjectName().toString().indexOf('=') + 1);
-          for (Object attr : attrList) {
-            Attribute localAttr = (Attribute) attr;
-            if (isGanglia3) {
-              server.createGangliaMessage(GANGLIA_CONTEXT + component + "."
-                      + localAttr.getName(),
-                      localAttr.getValue().toString());
-            } else {
-              server.createGangliaMessage31(GANGLIA_CONTEXT + component + "."
-                      + localAttr.getName(),
-                      localAttr.getValue().toString());
+          queryMBeans = mbeanServer.queryMBeans(
+                  null, null);
+        } catch (Exception ex) {
+          logger.error("Could not get Mbeans for monitoring", ex);
+          Throwables.propagate(ex);
+        }
+        for (ObjectInstance obj : queryMBeans) {
+          try {
+            if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
+              continue;
+            }
+            MBeanAttributeInfo[] attrs = mbeanServer.
+                    getMBeanInfo(obj.getObjectName()).getAttributes();
+            String strAtts[] = new String[attrs.length];
+            for (int i = 0; i < strAtts.length; i++) {
+              strAtts[i] = attrs[i].getName();
             }
-            server.sendToGangliaNodes();
+            AttributeList attrList = mbeanServer.getAttributes(
+                    obj.getObjectName(), strAtts);
+            String component = obj.getObjectName().toString().substring(
+                obj.getObjectName().toString().indexOf('=') + 1);
+            for (Object attr : attrList) {
+              Attribute localAttr = (Attribute) attr;
+              if (isGanglia3) {
+                server.createGangliaMessage(GANGLIA_CONTEXT + component + "."
+                        + localAttr.getName(),
+                        localAttr.getValue().toString());
+              } else {
+                server.createGangliaMessage31(GANGLIA_CONTEXT + component + "."
+                        + localAttr.getName(),
+                        localAttr.getValue().toString());
+              }
+              server.sendToGangliaNodes();
+            }
+          } catch (Exception ex) {
+            logger.error("Error getting mbean attributes", ex);
           }
-        } catch (Exception ex) {
-          logger.error("Error getting mbean attributes", ex);
         }
+      } catch(Throwable t) {
+        logger.error("Unexpected error", t);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
index 2ac94df..78eda05 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
@@ -201,94 +201,94 @@ public class LifecycleSupervisor implements LifecycleAware {
 
       long now = System.currentTimeMillis();
 
-      if (supervisoree.status.firstSeen == null) {
-        logger.debug("first time seeing {}", lifecycleAware);
+      try {
+        if (supervisoree.status.firstSeen == null) {
+          logger.debug("first time seeing {}", lifecycleAware);
 
-        supervisoree.status.firstSeen = now;
-      }
-
-      supervisoree.status.lastSeen = now;
-      synchronized (lifecycleAware) {
-        if (supervisoree.status.discard) {
-          // Unsupervise has already been called on this.
-          logger.info("Component has already been stopped {}", lifecycleAware);
-          return;
-        } else if(supervisoree.status.error) {
-          logger.info("Component {} is in error state, and Flume will not" +
-              "attempt to change its state", lifecycleAware);
-          return;
+          supervisoree.status.firstSeen = now;
         }
 
-      supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
+        supervisoree.status.lastSeen = now;
+        synchronized (lifecycleAware) {
+          if (supervisoree.status.discard) {
+            // Unsupervise has already been called on this.
+            logger.info("Component has already been stopped {}", lifecycleAware);
+            return;
+          } else if (supervisoree.status.error) {
+            logger.info("Component {} is in error state, and Flume will not"
+                + "attempt to change its state", lifecycleAware);
+            return;
+          }
+
+          supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
 
-      if (!lifecycleAware.getLifecycleState().equals(
-          supervisoree.status.desiredState)) {
+          if (!lifecycleAware.getLifecycleState().equals(
+              supervisoree.status.desiredState)) {
 
-        logger
-            .debug("Want to transition {} from {} to {} (failures:{})",
-                new Object[] { lifecycleAware,
-                    supervisoree.status.lastSeenState,
+            logger.debug("Want to transition {} from {} to {} (failures:{})",
+                new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
                     supervisoree.status.desiredState,
                     supervisoree.status.failures });
 
-        switch (supervisoree.status.desiredState) {
-        case START:
-          try {
-            lifecycleAware.start();
-          } catch (Throwable e) {
-            logger.error("Unable to start " + lifecycleAware
-                + " - Exception follows.", e);
-            if(e instanceof Error){
-              //This component can never recover, shut it down.
-              supervisoree.status.desiredState = LifecycleState.STOP;
-              try{
-                lifecycleAware.stop();
-                logger.warn("Component {} stopped, since it could not be" +
-                    "successfully started due to missing dependencies",
-                    lifecycleAware);
-              } catch (Throwable e1) {
-                logger.error("Unsuccessful attempt to " +
-                    "shutdown component: {} due to missing dependencies." +
-                    " Please shutdown the agent" +
-                    "or disable this component, or the agent will be" +
-                    "in an undefined state.", e1);
-                supervisoree.status.error = true;
-                if(e1 instanceof Error){
-                  throw (Error)e1;
+            switch (supervisoree.status.desiredState) {
+              case START:
+                try {
+                  lifecycleAware.start();
+                } catch (Throwable e) {
+                  logger.error("Unable to start " + lifecycleAware
+                      + " - Exception follows.", e);
+                  if (e instanceof Error) {
+                    // This component can never recover, shut it down.
+                    supervisoree.status.desiredState = LifecycleState.STOP;
+                    try {
+                      lifecycleAware.stop();
+                      logger.warn("Component {} stopped, since it could not be"
+                          + "successfully started due to missing dependencies",
+                          lifecycleAware);
+                    } catch (Throwable e1) {
+                      logger.error("Unsuccessful attempt to "
+                          + "shutdown component: {} due to missing dependencies."
+                          + " Please shutdown the agent"
+                          + "or disable this component, or the agent will be"
+                          + "in an undefined state.", e1);
+                      supervisoree.status.error = true;
+                      if (e1 instanceof Error) {
+                        throw (Error) e1;
+                      }
+                      // Set the state to stop, so that the conf poller can
+                      // proceed.
+                    }
+                  }
+                  supervisoree.status.failures++;
+                }
+                break;
+              case STOP:
+                try {
+                  lifecycleAware.stop();
+                } catch (Throwable e) {
+                  logger.error("Unable to stop " + lifecycleAware
+                      + " - Exception follows.", e);
+                  if (e instanceof Error) {
+                    throw (Error) e;
+                  }
+                  supervisoree.status.failures++;
                 }
-                //Set the state to stop, so that the conf poller can
-                //proceed.
-              }
+                break;
+              default:
+                logger.warn("I refuse to acknowledge {} as a desired state",
+                    supervisoree.status.desiredState);
             }
-            supervisoree.status.failures++;
-          }
-          break;
-        case STOP:
-          try {
-            lifecycleAware.stop();
-          } catch (Throwable e) {
-            logger.error("Unable to stop " + lifecycleAware
-                + " - Exception follows.", e);
-            if(e instanceof Error) {
-              throw (Error)e;
+
+            if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
+              logger.error(
+                  "Policy {} of {} has been violated - supervisor should exit!",
+                  supervisoree.policy, lifecycleAware);
             }
-            supervisoree.status.failures++;
           }
-          break;
-        default:
-          logger.warn("I refuse to acknowledge {} as a desired state",
-              supervisoree.status.desiredState);
-        }
-
-          if (!supervisoree.policy.isValid(
-              lifecycleAware, supervisoree.status)) {
-          logger.error(
-              "Policy {} of {} has been violated - supervisor should exit!",
-              supervisoree.policy, lifecycleAware);
         }
+      } catch(Throwable t) {
+        logger.error("Unexpected error", t);
       }
-      }
-
       logger.debug("Status check complete");
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
index 9f900d3..a2c882b 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
@@ -206,6 +206,9 @@ public abstract class AbstractFileConfigurationProvider implements
         } catch (NoClassDefFoundError e) {
           logger.error("Failed to start agent because dependencies were not " +
               "found in classpath. Error follows.", e);
+        } catch (Throwable t) {
+          // caught because the caller does not handle or log Throwables
+          logger.error("Unhandled error", t);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 75ff069..6408eb9 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Throwables;
+
 /**
  * Internal API intended for HDFSSink use.
  * This class does file rolling and handles file formats and serialization.
@@ -199,7 +201,7 @@ class BucketWriter {
         if (ex instanceof IOException) {
           throw (IOException) ex;
         } else {
-          throw new IOException(ex);
+          throw Throwables.propagate(ex);
         }
       }
     }
@@ -213,7 +215,11 @@ class BucketWriter {
         public Void call() throws Exception {
           LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
               bucketPath + IN_USE_EXT, rollInterval);
-          close();
+          try {
+            close();
+          } catch(Throwable t) {
+            LOG.error("Unexpected error", t);
+          }
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index d65c5a8..fcb9642 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -460,7 +460,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
       LOG.info("Closing {}", entry.getKey());
 
-      final BucketWriter callableWriter = entry.getValue();
       try {
         close(entry.getValue());
       } catch (Exception ex) {