You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/07/31 06:00:50 UTC
git commit: FLUME-1408. Log uncaught Throwables thrown within
Executors.
Updated Branches:
refs/heads/trunk 5c3d966a7 -> a0a55703c
FLUME-1408. Log uncaught Throwables thrown within Executors.
(Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a0a55703
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a0a55703
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a0a55703
Branch: refs/heads/trunk
Commit: a0a55703c875b79f7851fbc91ab22c96b27b6162
Parents: 5c3d966
Author: Hari <hs...@apache.org>
Authored: Mon Jul 30 20:58:57 2012 -0700
Committer: Hari <hs...@apache.org>
Committed: Mon Jul 30 20:58:57 2012 -0700
----------------------------------------------------------------------
.../flume/instrumentation/GangliaServer.java | 73 ++++----
.../flume/lifecycle/LifecycleSupervisor.java | 148 +++++++-------
.../file/AbstractFileConfigurationProvider.java | 3 +
.../org/apache/flume/sink/hdfs/BucketWriter.java | 10 +-
.../org/apache/flume/sink/hdfs/HDFSEventSink.java | 1 -
5 files changed, 123 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
index 1104141..d93cd33 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
@@ -37,7 +37,6 @@ import javax.management.AttributeList;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
-import javax.management.ObjectName;
import org.apache.flume.Context;
import org.apache.flume.FlumeException;
import org.apache.flume.api.HostInfo;
@@ -332,45 +331,49 @@ public class GangliaServer implements MonitorService {
@Override
public void run() {
- Set<ObjectInstance> queryMBeans = null;
try {
- queryMBeans = mbeanServer.queryMBeans(
- null, null);
- } catch (Exception ex) {
- logger.error("Could not get Mbeans for monitoring", ex);
- Throwables.propagate(ex);
- }
- for (ObjectInstance obj : queryMBeans) {
+ Set<ObjectInstance> queryMBeans = null;
try {
- if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
- continue;
- }
- MBeanAttributeInfo[] attrs = mbeanServer.
- getMBeanInfo(obj.getObjectName()).getAttributes();
- String strAtts[] = new String[attrs.length];
- for (int i = 0; i < strAtts.length; i++) {
- strAtts[i] = attrs[i].getName();
- }
- AttributeList attrList = mbeanServer.getAttributes(
- obj.getObjectName(), strAtts);
- String component = obj.getObjectName().toString().substring(
- obj.getObjectName().toString().indexOf('=') + 1);
- for (Object attr : attrList) {
- Attribute localAttr = (Attribute) attr;
- if (isGanglia3) {
- server.createGangliaMessage(GANGLIA_CONTEXT + component + "."
- + localAttr.getName(),
- localAttr.getValue().toString());
- } else {
- server.createGangliaMessage31(GANGLIA_CONTEXT + component + "."
- + localAttr.getName(),
- localAttr.getValue().toString());
+ queryMBeans = mbeanServer.queryMBeans(
+ null, null);
+ } catch (Exception ex) {
+ logger.error("Could not get Mbeans for monitoring", ex);
+ Throwables.propagate(ex);
+ }
+ for (ObjectInstance obj : queryMBeans) {
+ try {
+ if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
+ continue;
+ }
+ MBeanAttributeInfo[] attrs = mbeanServer.
+ getMBeanInfo(obj.getObjectName()).getAttributes();
+ String strAtts[] = new String[attrs.length];
+ for (int i = 0; i < strAtts.length; i++) {
+ strAtts[i] = attrs[i].getName();
}
- server.sendToGangliaNodes();
+ AttributeList attrList = mbeanServer.getAttributes(
+ obj.getObjectName(), strAtts);
+ String component = obj.getObjectName().toString().substring(
+ obj.getObjectName().toString().indexOf('=') + 1);
+ for (Object attr : attrList) {
+ Attribute localAttr = (Attribute) attr;
+ if (isGanglia3) {
+ server.createGangliaMessage(GANGLIA_CONTEXT + component + "."
+ + localAttr.getName(),
+ localAttr.getValue().toString());
+ } else {
+ server.createGangliaMessage31(GANGLIA_CONTEXT + component + "."
+ + localAttr.getName(),
+ localAttr.getValue().toString());
+ }
+ server.sendToGangliaNodes();
+ }
+ } catch (Exception ex) {
+ logger.error("Error getting mbean attributes", ex);
}
- } catch (Exception ex) {
- logger.error("Error getting mbean attributes", ex);
}
+ } catch(Throwable t) {
+ logger.error("Unexpected error", t);
}
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
index 2ac94df..78eda05 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
@@ -201,94 +201,94 @@ public class LifecycleSupervisor implements LifecycleAware {
long now = System.currentTimeMillis();
- if (supervisoree.status.firstSeen == null) {
- logger.debug("first time seeing {}", lifecycleAware);
+ try {
+ if (supervisoree.status.firstSeen == null) {
+ logger.debug("first time seeing {}", lifecycleAware);
- supervisoree.status.firstSeen = now;
- }
-
- supervisoree.status.lastSeen = now;
- synchronized (lifecycleAware) {
- if (supervisoree.status.discard) {
- // Unsupervise has already been called on this.
- logger.info("Component has already been stopped {}", lifecycleAware);
- return;
- } else if(supervisoree.status.error) {
- logger.info("Component {} is in error state, and Flume will not" +
- "attempt to change its state", lifecycleAware);
- return;
+ supervisoree.status.firstSeen = now;
}
- supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
+ supervisoree.status.lastSeen = now;
+ synchronized (lifecycleAware) {
+ if (supervisoree.status.discard) {
+ // Unsupervise has already been called on this.
+ logger.info("Component has already been stopped {}", lifecycleAware);
+ return;
+ } else if (supervisoree.status.error) {
+ logger.info("Component {} is in error state, and Flume will not"
+ + "attempt to change its state", lifecycleAware);
+ return;
+ }
+
+ supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
- if (!lifecycleAware.getLifecycleState().equals(
- supervisoree.status.desiredState)) {
+ if (!lifecycleAware.getLifecycleState().equals(
+ supervisoree.status.desiredState)) {
- logger
- .debug("Want to transition {} from {} to {} (failures:{})",
- new Object[] { lifecycleAware,
- supervisoree.status.lastSeenState,
+ logger.debug("Want to transition {} from {} to {} (failures:{})",
+ new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
supervisoree.status.desiredState,
supervisoree.status.failures });
- switch (supervisoree.status.desiredState) {
- case START:
- try {
- lifecycleAware.start();
- } catch (Throwable e) {
- logger.error("Unable to start " + lifecycleAware
- + " - Exception follows.", e);
- if(e instanceof Error){
- //This component can never recover, shut it down.
- supervisoree.status.desiredState = LifecycleState.STOP;
- try{
- lifecycleAware.stop();
- logger.warn("Component {} stopped, since it could not be" +
- "successfully started due to missing dependencies",
- lifecycleAware);
- } catch (Throwable e1) {
- logger.error("Unsuccessful attempt to " +
- "shutdown component: {} due to missing dependencies." +
- " Please shutdown the agent" +
- "or disable this component, or the agent will be" +
- "in an undefined state.", e1);
- supervisoree.status.error = true;
- if(e1 instanceof Error){
- throw (Error)e1;
+ switch (supervisoree.status.desiredState) {
+ case START:
+ try {
+ lifecycleAware.start();
+ } catch (Throwable e) {
+ logger.error("Unable to start " + lifecycleAware
+ + " - Exception follows.", e);
+ if (e instanceof Error) {
+ // This component can never recover, shut it down.
+ supervisoree.status.desiredState = LifecycleState.STOP;
+ try {
+ lifecycleAware.stop();
+ logger.warn("Component {} stopped, since it could not be"
+ + "successfully started due to missing dependencies",
+ lifecycleAware);
+ } catch (Throwable e1) {
+ logger.error("Unsuccessful attempt to "
+ + "shutdown component: {} due to missing dependencies."
+ + " Please shutdown the agent"
+ + "or disable this component, or the agent will be"
+ + "in an undefined state.", e1);
+ supervisoree.status.error = true;
+ if (e1 instanceof Error) {
+ throw (Error) e1;
+ }
+ // Set the state to stop, so that the conf poller can
+ // proceed.
+ }
+ }
+ supervisoree.status.failures++;
+ }
+ break;
+ case STOP:
+ try {
+ lifecycleAware.stop();
+ } catch (Throwable e) {
+ logger.error("Unable to stop " + lifecycleAware
+ + " - Exception follows.", e);
+ if (e instanceof Error) {
+ throw (Error) e;
+ }
+ supervisoree.status.failures++;
}
- //Set the state to stop, so that the conf poller can
- //proceed.
- }
+ break;
+ default:
+ logger.warn("I refuse to acknowledge {} as a desired state",
+ supervisoree.status.desiredState);
}
- supervisoree.status.failures++;
- }
- break;
- case STOP:
- try {
- lifecycleAware.stop();
- } catch (Throwable e) {
- logger.error("Unable to stop " + lifecycleAware
- + " - Exception follows.", e);
- if(e instanceof Error) {
- throw (Error)e;
+
+ if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
+ logger.error(
+ "Policy {} of {} has been violated - supervisor should exit!",
+ supervisoree.policy, lifecycleAware);
}
- supervisoree.status.failures++;
}
- break;
- default:
- logger.warn("I refuse to acknowledge {} as a desired state",
- supervisoree.status.desiredState);
- }
-
- if (!supervisoree.policy.isValid(
- lifecycleAware, supervisoree.status)) {
- logger.error(
- "Policy {} of {} has been violated - supervisor should exit!",
- supervisoree.policy, lifecycleAware);
}
+ } catch(Throwable t) {
+ logger.error("Unexpected error", t);
}
- }
-
logger.debug("Status check complete");
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
index 9f900d3..a2c882b 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
@@ -206,6 +206,9 @@ public abstract class AbstractFileConfigurationProvider implements
} catch (NoClassDefFoundError e) {
logger.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
+ } catch (Throwable t) {
+ // caught because the caller does not handle or log Throwables
+ logger.error("Unhandled error", t);
}
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 75ff069..6408eb9 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Throwables;
+
/**
* Internal API intended for HDFSSink use.
* This class does file rolling and handles file formats and serialization.
@@ -199,7 +201,7 @@ class BucketWriter {
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
- throw new IOException(ex);
+ throw Throwables.propagate(ex);
}
}
}
@@ -213,7 +215,11 @@ class BucketWriter {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath + IN_USE_EXT, rollInterval);
- close();
+ try {
+ close();
+ } catch(Throwable t) {
+ LOG.error("Unexpected error", t);
+ }
return null;
}
};
http://git-wip-us.apache.org/repos/asf/flume/blob/a0a55703/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index d65c5a8..fcb9642 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -460,7 +460,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
LOG.info("Closing {}", entry.getKey());
- final BucketWriter callableWriter = entry.getValue();
try {
close(entry.getValue());
} catch (Exception ex) {