You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/12 21:09:46 UTC

svn commit: r1299818 - in /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume: SinkRunner.java source/PollableSourceRunner.java

Author: arvind
Date: Mon Mar 12 20:09:46 2012
New Revision: 1299818

URL: http://svn.apache.org/viewvc?rev=1299818&view=rev
Log:
FLUME-984. SinkRunner should catch unhandled exceptions and log them.

(Brock Noland via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java?rev=1299818&r1=1299817&r2=1299818&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java Mon Mar 12 20:09:46 2012
@@ -21,8 +21,8 @@ package org.apache.flume;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,6 +85,8 @@ public class SinkRunner implements Lifec
     runner.shouldStop = new AtomicBoolean();
 
     runnerThread = new Thread(runner);
+    runnerThread.setName("SinkRunner-PollingRunner-" +
+        policy.getClass().getSimpleName());
     runnerThread.start();
 
     lifecycleState = LifecycleState.START;
@@ -157,10 +159,18 @@ public class SinkRunner implements Lifec
           counterGroup.incrementAndGet("runner.interruptions");
         } catch (EventDeliveryException e) {
           logger.error("Unable to deliver event. Exception follows.", e);
+          counterGroup.incrementAndGet("runner.deliveryErrors");
+        } catch (Exception e) {
           counterGroup.incrementAndGet("runner.errors");
+          logger.error("Unhandled exception, logging and sleeping for " +
+              maxBackoffSleep + "ms", e);
+          try {
+            Thread.sleep(maxBackoffSleep);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+          }
         }
       }
-
       logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
     }
 

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java?rev=1299818&r1=1299817&r2=1299818&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java Mon Mar 12 20:09:46 2012
@@ -22,6 +22,7 @@ package org.apache.flume.source;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flume.CounterGroup;
+import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSource;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -91,9 +92,9 @@ public class PollableSourceRunner extend
       runnerThread.join();
     } catch (InterruptedException e) {
       logger
-          .warn(
-              "Interrupted while waiting for polling runner to stop. Please report this.",
-              e);
+      .warn(
+          "Interrupted while waiting for polling runner to stop. Please report this.",
+          e);
       Thread.currentThread().interrupt();
     }
 
@@ -108,6 +109,7 @@ public class PollableSourceRunner extend
         + counterGroup + " }";
   }
 
+  @Override
   public LifecycleState getLifecycleState() {
     return lifecycleState;
   }
@@ -131,15 +133,25 @@ public class PollableSourceRunner extend
 
             Thread.sleep(Math.min(
                 counterGroup.incrementAndGet("runner.backoffs.consecutive")
-                    * backoffSleepIncrement, maxBackoffSleep));
+                * backoffSleepIncrement, maxBackoffSleep));
           } else {
             counterGroup.set("runner.backoffs.consecutive", 0L);
           }
         } catch (InterruptedException e) {
           logger.info("Source runner interrupted. Exiting");
+          counterGroup.incrementAndGet("runner.interruptions");
+        } catch (EventDeliveryException e) {
+          logger.error("Unable to deliver event. Exception follows.", e);
+          counterGroup.incrementAndGet("runner.deliveryErrors");
         } catch (Exception e) {
-          logger.error("Unable to process event. Exception follows.", e);
-          counterGroup.incrementAndGet("runner.failures");
+          counterGroup.incrementAndGet("runner.errors");
+          logger.error("Unhandled exception, logging and sleeping for " +
+              maxBackoffSleep + "ms", e);
+          try {
+            Thread.sleep(maxBackoffSleep);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+          }
         }
       }