You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/23 17:50:21 UTC
svn commit: r1304474 - in /incubator/flume/trunk/flume-ng-core/src:
main/java/org/apache/flume/sink/FailoverSinkProcessor.java
test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
Author: arvind
Date: Fri Mar 23 16:50:21 2012
New Revision: 1304474
URL: http://svn.apache.org/viewvc?rev=1304474&view=rev
Log:
FLUME-1030. Retry mechanism for failover sink processor.
(Juhani Connolly via Arvind Prabhakar)
Modified:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java?rev=1304474&r1=1304473&r2=1304474&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java Fri Mar 23 16:50:21 2012
@@ -21,6 +21,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -34,12 +36,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ * FailoverSinkProcessor maintains a prioritized list of sinks,
+ * guarranteeing that so long as one is available events will be processed.
+ *
+ * The failover mechanism works by relegating failed sinks to a pool
+ * where they are assigned a cooldown period, increasing with sequential
+ * failures before they are retried. Once a sink succesfully sends an
+ * event it is restored to the live pool.
+ *
* FailoverSinkProcessor is in no way thread safe and expects to be run via
* SinkRunner Additionally, setSinks must be called before configure, and
* additional sinks cannot be added while running
*
* To configure, set a sink groups processor to "failover" and set priorities
- * for individual sinks, all priorities must be unique:
+ * for individual sinks, all priorities must be unique. Furthermore, an
+ * upper limit to failover time can be set(in miliseconds) using maxpenalty
*
* Ex)
*
@@ -49,18 +60,63 @@ import org.slf4j.LoggerFactory;
* host1.sinkgroups.group1.processor.type = failover
* host1.sinkgroups.group1.processor.priority.sink1 = 5
* host1.sinkgroups.group1.processor.priority.sink2 = 10
+ * host1.sinkgroups.group1.processor.maxpenalty = 10000
*
*/
public class FailoverSinkProcessor implements SinkProcessor {
+ private static final int FAILURE_PENALTY = 1000;
+ private static final int DEFAULT_MAX_PENALTY = 30000;
+
+ private class FailedSink implements Comparable<FailedSink> {
+ private Long refresh;
+ private Integer priority;
+ private Sink sink;
+ private Integer sequentialFailures;
+ public FailedSink(Integer priority, Sink sink, int seqFailures) {
+ this.sink = sink;
+ this.priority = priority;
+ this.sequentialFailures = seqFailures;
+ adjustRefresh();
+ }
+ @Override
+ public int compareTo(FailedSink arg0) {
+ return refresh.compareTo(arg0.refresh);
+ }
+
+ public Long getRefresh() {
+ return refresh;
+ }
+
+ public Sink getSink() {
+ return sink;
+ }
+ public Integer getPriority() {
+ return priority;
+ }
+ public void incFails() {
+ sequentialFailures++;
+ adjustRefresh();
+ logger.debug("Sink {} failed again, new refresh is at {}, " +
+ "current time {}", new Object[] {
+ sink.getName(), refresh, System.currentTimeMillis()});
+ }
+ private void adjustRefresh() {
+ refresh = System.currentTimeMillis()
+ + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
+ }
+ }
+
private static final Logger logger = LoggerFactory
.getLogger(FailoverSinkProcessor.class);
private static final String PRIORITY_PREFIX = "priority.";
+ private static final String MAX_PENALTY_PREFIX = "maxpenalty";
private Map<String, Sink> sinks;
private Sink activeSink;
private SortedMap<Integer, Sink> liveSinks;
- private SortedMap<Integer, Sink> deadSinks;
+ private Queue<FailedSink> failedSinks;
private LifecycleState state;
+ private int maxPenalty;
@Override
public void start() {
@@ -86,16 +142,26 @@ public class FailoverSinkProcessor imple
@Override
public void configure(Context context) {
liveSinks = new TreeMap<Integer, Sink>();
- deadSinks = new TreeMap<Integer, Sink>();
+ failedSinks = new PriorityQueue<FailedSink>();
Integer nextPrio = 0;
+ String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
+ if(maxPenaltyStr == null) {
+ maxPenalty = DEFAULT_MAX_PENALTY;
+ } else {
+ try {
+ maxPenalty = Integer.parseInt(maxPenaltyStr);
+ } catch (NumberFormatException e) {
+ logger.warn("{} is not a valid value for {}",
+ new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
+ maxPenalty = DEFAULT_MAX_PENALTY;
+ }
+ }
for (Entry<String, Sink> entry : sinks.entrySet()) {
String priStr = PRIORITY_PREFIX + entry.getKey();
Integer priority;
try {
priority = Integer.parseInt(context.getString(priStr));
- } catch (NumberFormatException e) {
- priority = --nextPrio;
- } catch (NullPointerException e) {
+ } catch (Exception e) {
priority = --nextPrio;
}
if(!liveSinks.containsKey(priority)) {
@@ -111,39 +177,48 @@ public class FailoverSinkProcessor imple
@Override
public Status process() throws EventDeliveryException {
+ // Retry any failed sinks that have gone through their "cooldown" period
+ Long now = System.currentTimeMillis();
+ while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
+ FailedSink cur = failedSinks.poll();
+ Status s;
+ try {
+ s = cur.getSink().process();
+ if (s == Status.READY) {
+ liveSinks.put(cur.getPriority(), cur.getSink());
+ activeSink = liveSinks.get(liveSinks.lastKey());
+ logger.debug("Sink {} was recovered from the fail list",
+ cur.getSink().getName());
+ } else {
+ // if it's a backoff it needn't be penalized.
+ failedSinks.add(cur);
+ }
+ return s;
+ } catch (Exception e) {
+ cur.incFails();
+ failedSinks.add(cur);
+ }
+ }
+
Status ret = null;
while(activeSink != null) {
try {
ret = activeSink.process();
return ret;
- } catch (EventDeliveryException e) {
+ } catch (Exception e) {
+ logger.warn("Sink {} failed and has been sent to failover list",
+ activeSink.getName(), e);
activeSink = moveActiveToDeadAndGetNext();
}
}
- // if none of the live ones worked, give the dead ones a go,
- // then give up
- for (Entry<Integer, Sink> entry : deadSinks.entrySet()) {
- try {
- ret = entry.getValue().process();
- // if it worked, put it in the liveSinks pile and rejoice
- if (ret == Status.READY) {
- activeSink = entry.getValue();
- liveSinks.put(entry.getKey(), entry.getValue());
- deadSinks.remove(entry.getKey());
- return ret;
- }
- } catch (EventDeliveryException e) {
-
- }
- }
throw new EventDeliveryException("All sinks failed to process, " +
"nothing left to failover to");
}
private Sink moveActiveToDeadAndGetNext() {
Integer key = liveSinks.lastKey();
- deadSinks.put(key, activeSink);
+ failedSinks.add(new FailedSink(key, activeSink, 1));
liveSinks.remove(key);
if(liveSinks.isEmpty()) return null;
if(liveSinks.lastKey() != null) {
Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java?rev=1304474&r1=1304473&r2=1304474&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java Fri Mar 23 16:50:21 2012
@@ -125,7 +125,7 @@ public class TestFailoverSinkProcessor {
* Test failover by feeding events to the channel and verifying at various
* stages that the number of events consumed by each sink matches expected
* failover patterns
- *
+ *
* @throws InterruptedException
*/
@Test
@@ -159,6 +159,7 @@ public class TestFailoverSinkProcessor {
params.put("processor.priority.s1", "3");
params.put("processor.priority.s2", "2");
params.put("processor.priority.s3", "1");
+ params.put("processor.maxpenalty", "10000");
context.putAll(params);
Configurables.configure(group, context);
SinkRunner runner = new SinkRunner(group.getProcessor());
@@ -190,6 +191,10 @@ public class TestFailoverSinkProcessor {
Assert.assertEquals(new Integer(5), s3.getWritten());
// test rollover to recovered servers
s2.setRemaining(20);
+
+ // get us past the retry time for the failed sink
+ Thread.sleep(5000);
+
for(int i = 0; i < 100; i++) {
Transaction tx = ch.getTransaction();
tx.begin();
@@ -200,8 +205,8 @@ public class TestFailoverSinkProcessor {
Thread.sleep(1000);
Assert.assertEquals(new Integer(10), s1.getWritten());
- Assert.assertEquals(new Integer(55), s2.getWritten());
- Assert.assertEquals(new Integer(100), s3.getWritten());
+ Assert.assertEquals(new Integer(70), s2.getWritten());
+ Assert.assertEquals(new Integer(85), s3.getWritten());
runner.stop();
ch.stop();