You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/23 17:50:21 UTC

svn commit: r1304474 - in /incubator/flume/trunk/flume-ng-core/src: main/java/org/apache/flume/sink/FailoverSinkProcessor.java test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java

Author: arvind
Date: Fri Mar 23 16:50:21 2012
New Revision: 1304474

URL: http://svn.apache.org/viewvc?rev=1304474&view=rev
Log:
FLUME-1030. Retry mechanism for failover sink processor.

(Juhani Connolly via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java?rev=1304474&r1=1304473&r2=1304474&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java Fri Mar 23 16:50:21 2012
@@ -21,6 +21,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -34,12 +36,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * FailoverSinkProcessor maintains a prioritized list of sinks,
+ * guarranteeing that so long as one is available events will be processed.
+ *
+ * The failover mechanism works by relegating failed sinks to a pool
+ * where they are assigned a cooldown period, increasing with sequential
+ * failures before they are retried. Once a sink succesfully sends an
+ * event it is restored to the live pool.
+ *
  * FailoverSinkProcessor is in no way thread safe and expects to be run via
  * SinkRunner Additionally, setSinks must be called before configure, and
  * additional sinks cannot be added while running
  *
  * To configure, set a sink groups processor to "failover" and set priorities
- * for individual sinks, all priorities must be unique:
+ * for individual sinks, all priorities must be unique. Furthermore, an
+ * upper limit to failover time can be set(in miliseconds) using maxpenalty
  *
  * Ex)
  *
@@ -49,18 +60,63 @@ import org.slf4j.LoggerFactory;
  * host1.sinkgroups.group1.processor.type = failover
  * host1.sinkgroups.group1.processor.priority.sink1 = 5
  * host1.sinkgroups.group1.processor.priority.sink2 = 10
+ * host1.sinkgroups.group1.processor.maxpenalty = 10000
  *
  */
 public class FailoverSinkProcessor implements SinkProcessor {
+  private static final int FAILURE_PENALTY = 1000;
+  private static final int DEFAULT_MAX_PENALTY = 30000;
+
+  private class FailedSink implements Comparable<FailedSink> {
+    private Long refresh;
+    private Integer priority;
+    private Sink sink;
+    private Integer sequentialFailures;
+    public FailedSink(Integer priority, Sink sink, int seqFailures) {
+      this.sink = sink;
+      this.priority = priority;
+      this.sequentialFailures = seqFailures;
+      adjustRefresh();
+    }
+    @Override
+    public int compareTo(FailedSink arg0) {
+      return refresh.compareTo(arg0.refresh);
+    }
+
+    public Long getRefresh() {
+      return refresh;
+    }
+
+    public Sink getSink() {
+      return sink;
+    }
+    public Integer getPriority() {
+      return priority;
+    }
+    public void incFails() {
+      sequentialFailures++;
+      adjustRefresh();
+      logger.debug("Sink {} failed again, new refresh is at {}, " +
+            "current time {}", new Object[] {
+              sink.getName(), refresh, System.currentTimeMillis()});
+    }
+    private void adjustRefresh() {
+      refresh = System.currentTimeMillis()
+              + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
+    }
+  }
+
   private static final Logger logger = LoggerFactory
       .getLogger(FailoverSinkProcessor.class);
 
   private static final String PRIORITY_PREFIX = "priority.";
+  private static final String MAX_PENALTY_PREFIX = "maxpenalty";
   private Map<String, Sink> sinks;
   private Sink activeSink;
   private SortedMap<Integer, Sink> liveSinks;
-  private SortedMap<Integer, Sink> deadSinks;
+  private Queue<FailedSink> failedSinks;
   private LifecycleState state;
+  private int maxPenalty;
 
   @Override
   public void start() {
@@ -86,16 +142,26 @@ public class FailoverSinkProcessor imple
   @Override
   public void configure(Context context) {
     liveSinks = new TreeMap<Integer, Sink>();
-    deadSinks = new TreeMap<Integer, Sink>();
+    failedSinks = new PriorityQueue<FailedSink>();
     Integer nextPrio = 0;
+    String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
+    if(maxPenaltyStr == null) {
+      maxPenalty = DEFAULT_MAX_PENALTY;
+    } else {
+      try {
+        maxPenalty = Integer.parseInt(maxPenaltyStr);
+      } catch (NumberFormatException e) {
+        logger.warn("{} is not a valid value for {}",
+                new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
+        maxPenalty  = DEFAULT_MAX_PENALTY;
+      }
+    }
     for (Entry<String, Sink> entry : sinks.entrySet()) {
       String priStr = PRIORITY_PREFIX + entry.getKey();
       Integer priority;
       try {
         priority =  Integer.parseInt(context.getString(priStr));
-      } catch (NumberFormatException e) {
-        priority = --nextPrio;
-      } catch (NullPointerException e) {
+      } catch (Exception e) {
         priority = --nextPrio;
       }
       if(!liveSinks.containsKey(priority)) {
@@ -111,39 +177,48 @@ public class FailoverSinkProcessor imple
 
   @Override
   public Status process() throws EventDeliveryException {
+    // Retry any failed sinks that have gone through their "cooldown" period
+    Long now = System.currentTimeMillis();
+    while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
+      FailedSink cur = failedSinks.poll();
+      Status s;
+      try {
+        s = cur.getSink().process();
+        if (s  == Status.READY) {
+          liveSinks.put(cur.getPriority(), cur.getSink());
+          activeSink = liveSinks.get(liveSinks.lastKey());
+          logger.debug("Sink {} was recovered from the fail list",
+                  cur.getSink().getName());
+        } else {
+          // if it's a backoff it needn't be penalized.
+          failedSinks.add(cur);
+        }
+        return s;
+      } catch (Exception e) {
+        cur.incFails();
+        failedSinks.add(cur);
+      }
+    }
+
     Status ret = null;
     while(activeSink != null) {
       try {
         ret = activeSink.process();
         return ret;
-      } catch (EventDeliveryException e) {
+      } catch (Exception e) {
+        logger.warn("Sink {} failed and has been sent to failover list",
+                activeSink.getName(), e);
         activeSink = moveActiveToDeadAndGetNext();
       }
     }
 
-    // if none of the live ones worked, give the dead ones a go,
-    // then give up
-    for (Entry<Integer, Sink> entry : deadSinks.entrySet()) {
-      try {
-        ret = entry.getValue().process();
-        // if it worked, put it in the liveSinks pile and rejoice
-        if (ret == Status.READY) {
-          activeSink = entry.getValue();
-          liveSinks.put(entry.getKey(), entry.getValue());
-          deadSinks.remove(entry.getKey());
-          return ret;
-        }
-      } catch (EventDeliveryException e) {
-
-      }
-    }
     throw new EventDeliveryException("All sinks failed to process, " +
         "nothing left to failover to");
   }
 
   private Sink moveActiveToDeadAndGetNext() {
     Integer key = liveSinks.lastKey();
-    deadSinks.put(key, activeSink);
+    failedSinks.add(new FailedSink(key, activeSink, 1));
     liveSinks.remove(key);
     if(liveSinks.isEmpty()) return null;
     if(liveSinks.lastKey() != null) {

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java?rev=1304474&r1=1304473&r2=1304474&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java Fri Mar 23 16:50:21 2012
@@ -125,7 +125,7 @@ public class TestFailoverSinkProcessor {
    * Test failover by feeding events to the channel and verifying at various
    * stages that the number of events consumed by each sink matches expected
    * failover patterns
-   * 
+   *
    * @throws InterruptedException
    */
   @Test
@@ -159,6 +159,7 @@ public class TestFailoverSinkProcessor {
     params.put("processor.priority.s1", "3");
     params.put("processor.priority.s2", "2");
     params.put("processor.priority.s3", "1");
+    params.put("processor.maxpenalty", "10000");
     context.putAll(params);
     Configurables.configure(group, context);
     SinkRunner runner = new SinkRunner(group.getProcessor());
@@ -190,6 +191,10 @@ public class TestFailoverSinkProcessor {
     Assert.assertEquals(new Integer(5), s3.getWritten());
     // test rollover to recovered servers
     s2.setRemaining(20);
+
+    // get us past the retry time for the failed sink
+    Thread.sleep(5000);
+
     for(int i = 0; i < 100; i++) {
       Transaction tx = ch.getTransaction();
       tx.begin();
@@ -200,8 +205,8 @@ public class TestFailoverSinkProcessor {
     Thread.sleep(1000);
 
     Assert.assertEquals(new Integer(10), s1.getWritten());
-    Assert.assertEquals(new Integer(55), s2.getWritten());
-    Assert.assertEquals(new Integer(100), s3.getWritten());
+    Assert.assertEquals(new Integer(70), s2.getWritten());
+    Assert.assertEquals(new Integer(85), s3.getWritten());
 
     runner.stop();
     ch.stop();