You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/12 21:09:46 UTC
svn commit: r1299818 - in
/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume:
SinkRunner.java source/PollableSourceRunner.java
Author: arvind
Date: Mon Mar 12 20:09:46 2012
New Revision: 1299818
URL: http://svn.apache.org/viewvc?rev=1299818&view=rev
Log:
FLUME-984. SinkRunner should catch unhandled exceptions and log them.
(Brock Noland via Arvind Prabhakar)
Modified:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java?rev=1299818&r1=1299817&r2=1299818&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java Mon Mar 12 20:09:46 2012
@@ -21,8 +21,8 @@ package org.apache.flume;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,6 +85,8 @@ public class SinkRunner implements Lifec
runner.shouldStop = new AtomicBoolean();
runnerThread = new Thread(runner);
+ runnerThread.setName("SinkRunner-PollingRunner-" +
+ policy.getClass().getSimpleName());
runnerThread.start();
lifecycleState = LifecycleState.START;
@@ -157,10 +159,18 @@ public class SinkRunner implements Lifec
counterGroup.incrementAndGet("runner.interruptions");
} catch (EventDeliveryException e) {
logger.error("Unable to deliver event. Exception follows.", e);
+ counterGroup.incrementAndGet("runner.deliveryErrors");
+ } catch (Exception e) {
counterGroup.incrementAndGet("runner.errors");
+ logger.error("Unhandled exception, logging and sleeping for " +
+ maxBackoffSleep + "ms", e);
+ try {
+ Thread.sleep(maxBackoffSleep);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
}
}
-
logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java?rev=1299818&r1=1299817&r2=1299818&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java Mon Mar 12 20:09:46 2012
@@ -22,6 +22,7 @@ package org.apache.flume.source;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.CounterGroup;
+import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.SourceRunner;
import org.apache.flume.lifecycle.LifecycleState;
@@ -91,9 +92,9 @@ public class PollableSourceRunner extend
runnerThread.join();
} catch (InterruptedException e) {
logger
- .warn(
- "Interrupted while waiting for polling runner to stop. Please report this.",
- e);
+ .warn(
+ "Interrupted while waiting for polling runner to stop. Please report this.",
+ e);
Thread.currentThread().interrupt();
}
@@ -108,6 +109,7 @@ public class PollableSourceRunner extend
+ counterGroup + " }";
}
+ @Override
public LifecycleState getLifecycleState() {
return lifecycleState;
}
@@ -131,15 +133,25 @@ public class PollableSourceRunner extend
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
- * backoffSleepIncrement, maxBackoffSleep));
+ * backoffSleepIncrement, maxBackoffSleep));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.info("Source runner interrupted. Exiting");
+ counterGroup.incrementAndGet("runner.interruptions");
+ } catch (EventDeliveryException e) {
+ logger.error("Unable to deliver event. Exception follows.", e);
+ counterGroup.incrementAndGet("runner.deliveryErrors");
} catch (Exception e) {
- logger.error("Unable to process event. Exception follows.", e);
- counterGroup.incrementAndGet("runner.failures");
+ counterGroup.incrementAndGet("runner.errors");
+ logger.error("Unhandled exception, logging and sleeping for " +
+ maxBackoffSleep + "ms", e);
+ try {
+ Thread.sleep(maxBackoffSleep);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
}
}