You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/12/05 20:00:21 UTC

svn commit: r1210573 - in /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume: sink/PollableSinkRunner.java source/PollableSourceRunner.java

Author: esammer
Date: Mon Dec  5 19:00:21 2011
New Revision: 1210573

URL: http://svn.apache.org/viewvc?rev=1210573&view=rev
Log:
FLUME-867: Pollable source and sink runners should reduce poll interval after several BACKOFFs

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java?rev=1210573&r1=1210572&r2=1210573&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java Mon Dec  5 19:00:21 2011
@@ -33,6 +33,8 @@ public class PollableSinkRunner extends 
 
   private static final Logger logger = LoggerFactory
       .getLogger(PollableSinkRunner.class);
+  private static final long backoffSleepIncrement = 1000;
+  private static final long maxBackoffSleep = 5000;
 
   private CounterGroup counterGroup;
   private PollingRunner runner;
@@ -112,8 +114,12 @@ public class PollableSinkRunner extends 
         try {
           if (sink.process().equals(PollableSink.Status.BACKOFF)) {
             counterGroup.incrementAndGet("runner.backoffs");
-            /* Should this be configurable? */
-            Thread.sleep(500);
+
+            Thread.sleep(Math.min(
+                counterGroup.incrementAndGet("runner.backoffs.consecutive")
+                    * backoffSleepIncrement, maxBackoffSleep));
+          } else {
+            counterGroup.set("runner.backoffs.consecutive", 0L);
           }
         } catch (InterruptedException e) {
           logger.debug("Interrupted while processing an event. Exiting.");

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java?rev=1210573&r1=1210572&r2=1210573&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java Mon Dec  5 19:00:21 2011
@@ -47,8 +47,8 @@ public class PollableSourceRunner extend
 
   private static final Logger logger = LoggerFactory
       .getLogger(PollableSourceRunner.class);
-  private static final long backoffSleepIncrement = 100;
-  private static final long maxBackoffSleep = 500;
+  private static final long backoffSleepIncrement = 1000;
+  private static final long maxBackoffSleep = 5000;
 
   private AtomicBoolean shouldStop;