You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/12/05 20:00:21 UTC
svn commit: r1210573 - in
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume:
sink/PollableSinkRunner.java source/PollableSourceRunner.java
Author: esammer
Date: Mon Dec 5 19:00:21 2011
New Revision: 1210573
URL: http://svn.apache.org/viewvc?rev=1210573&view=rev
Log:
FLUME-867: Pollable source and sink runners should reduce poll interval after several BACKOFFs
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java?rev=1210573&r1=1210572&r2=1210573&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java Mon Dec 5 19:00:21 2011
@@ -33,6 +33,8 @@ public class PollableSinkRunner extends
private static final Logger logger = LoggerFactory
.getLogger(PollableSinkRunner.class);
+ private static final long backoffSleepIncrement = 1000;
+ private static final long maxBackoffSleep = 5000;
private CounterGroup counterGroup;
private PollingRunner runner;
@@ -112,8 +114,12 @@ public class PollableSinkRunner extends
try {
if (sink.process().equals(PollableSink.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");
- /* Should this be configurable? */
- Thread.sleep(500);
+
+ Thread.sleep(Math.min(
+ counterGroup.incrementAndGet("runner.backoffs.consecutive")
+ * backoffSleepIncrement, maxBackoffSleep));
+ } else {
+ counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.debug("Interrupted while processing an event. Exiting.");
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java?rev=1210573&r1=1210572&r2=1210573&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java Mon Dec 5 19:00:21 2011
@@ -47,8 +47,8 @@ public class PollableSourceRunner extend
private static final Logger logger = LoggerFactory
.getLogger(PollableSourceRunner.class);
- private static final long backoffSleepIncrement = 100;
- private static final long maxBackoffSleep = 500;
+ private static final long backoffSleepIncrement = 1000;
+ private static final long maxBackoffSleep = 5000;
private AtomicBoolean shouldStop;