You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/02/03 20:46:16 UTC

svn commit: r1240313 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/test/java/org/apache/flume/sink/ flume-ng-node/src/main/java/org/apache/fl...

Author: arvind
Date: Fri Feb  3 19:46:15 2012
New Revision: 1240313

URL: http://svn.apache.org/viewvc?rev=1240313&view=rev
Log:
FLUME-949. Collapse PollableSink into Sink interface.

(Jarcec Checho via Arvind Prabhakar)

Removed:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java Fri Feb  3 19:46:15 2012
@@ -27,4 +27,9 @@ public interface Sink extends LifecycleA
 
   public Channel getChannel();
 
+  public Status process() throws EventDeliveryException;
+
+  public static enum Status {
+    READY, BACKOFF
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java Fri Feb  3 19:46:15 2012
@@ -19,25 +19,35 @@
 
 package org.apache.flume;
 
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.sink.PollableSinkRunner;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-abstract public class SinkRunner implements LifecycleAware {
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-  private Sink sink;
+public class SinkRunner implements LifecycleAware {
 
-  public static SinkRunner forSink(Sink sink) {
-    SinkRunner runner = null;
+  private static final Logger logger = LoggerFactory
+      .getLogger(SinkRunner.class);
+  private static final long backoffSleepIncrement = 1000;
+  private static final long maxBackoffSleep = 5000;
+
+  private CounterGroup counterGroup;
+  private PollingRunner runner;
+  private Thread runnerThread;
+  private LifecycleState lifecycleState;
 
-    if (sink instanceof PollableSink) {
-      runner = new PollableSinkRunner();
-      ((PollableSinkRunner) runner).setSink((PollableSink) sink);
-    } else {
-      throw new IllegalArgumentException("No known runner type for sink "
-          + sink);
-    }
+  private Sink sink;
 
-    return runner;
+  public SinkRunner() {
+    counterGroup = new CounterGroup();
+    lifecycleState = LifecycleState.IDLE;
+  }
+  
+  public SinkRunner(Sink sink) {
+    this();
+    setSink(sink);
   }
 
   public Sink getSink() {
@@ -48,4 +58,92 @@ abstract public class SinkRunner impleme
     this.sink = sink;
   }
 
+  @Override
+  public void start() {
+    Sink sink = getSink();
+
+    sink.start();
+
+    runner = new PollingRunner();
+
+    runner.sink = sink;
+    runner.counterGroup = counterGroup;
+    runner.shouldStop = new AtomicBoolean();
+
+    runnerThread = new Thread(runner);
+    runnerThread.start();
+
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop() {
+
+    getSink().stop();
+
+    if (runnerThread != null) {
+      runner.shouldStop.set(true);
+      runnerThread.interrupt();
+
+      while (runnerThread.isAlive()) {
+        try {
+          logger.debug("Waiting for runner thread to exit");
+          runnerThread.join(500);
+        } catch (InterruptedException e) {
+          logger
+              .debug(
+                  "Interrupted while waiting for runner thread to exit. Exception follows.",
+                  e);
+        }
+      }
+    }
+
+    lifecycleState = LifecycleState.STOP;
+  }
+
+  @Override
+  public String toString() {
+    return "SinkRunner: { sink:" + getSink() + " counterGroup:"
+        + counterGroup + " }";
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  public static class PollingRunner implements Runnable {
+
+    private Sink sink;
+    private AtomicBoolean shouldStop;
+    private CounterGroup counterGroup;
+
+    @Override
+    public void run() {
+      logger.debug("Polling sink runner starting");
+
+      while (!shouldStop.get()) {
+        try {
+          if (sink.process().equals(Sink.Status.BACKOFF)) {
+            counterGroup.incrementAndGet("runner.backoffs");
+
+            Thread.sleep(Math.min(
+                counterGroup.incrementAndGet("runner.backoffs.consecutive")
+                    * backoffSleepIncrement, maxBackoffSleep));
+          } else {
+            counterGroup.set("runner.backoffs.consecutive", 0L);
+          }
+        } catch (InterruptedException e) {
+          logger.debug("Interrupted while processing an event. Exiting.");
+          counterGroup.incrementAndGet("runner.interruptions");
+        } catch (EventDeliveryException e) {
+          logger.error("Unable to deliver event. Exception follows.", e);
+          counterGroup.incrementAndGet("runner.errors");
+        }
+      }
+
+      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
+    }
+
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Fri Feb  3 19:46:15 2012
@@ -37,7 +37,6 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
@@ -111,8 +110,7 @@ import com.google.common.base.Preconditi
  * TODO
  * </p>
  */
-public class AvroSink extends AbstractSink implements PollableSink,
-    Configurable {
+public class AvroSink extends AbstractSink implements Configurable {
 
   private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
   private static final Integer defaultBatchSize = 100;

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java Fri Feb  3 19:46:15 2012
@@ -20,7 +20,6 @@ package org.apache.flume.sink;
 import org.apache.flume.Channel;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.slf4j.Logger;
@@ -48,7 +47,7 @@ import org.slf4j.LoggerFactory;
  * TODO
  * </p>
  */
-public class LoggerSink extends AbstractSink implements PollableSink {
+public class LoggerSink extends AbstractSink {
 
   private static final Logger logger = LoggerFactory
       .getLogger(LoggerSink.class);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java Fri Feb  3 19:46:15 2012
@@ -21,7 +21,6 @@ import org.apache.flume.Channel;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.slf4j.Logger;
@@ -45,7 +44,7 @@ import org.slf4j.LoggerFactory;
  * TODO
  * </p>
  */
-public class NullSink extends AbstractSink implements PollableSink {
+public class NullSink extends AbstractSink {
 
   private static final Logger logger = LoggerFactory.getLogger(NullSink.class);
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Fri Feb  3 19:46:15 2012
@@ -31,7 +31,6 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.EventFormatter;
@@ -43,8 +42,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-public class RollingFileSink extends AbstractSink implements PollableSink,
-    Configurable {
+public class RollingFileSink extends AbstractSink implements Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(RollingFileSink.class);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Fri Feb  3 19:46:15 2012
@@ -31,7 +31,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
+import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
@@ -113,11 +113,11 @@ public class TestAvroSink {
     transaction.close();
 
     for (int i = 0; i < 5; i++) {
-      PollableSink.Status status = sink.process();
-      Assert.assertEquals(PollableSink.Status.READY, status);
+      Sink.Status status = sink.process();
+      Assert.assertEquals(Sink.Status.READY, status);
     }
 
-    Assert.assertEquals(PollableSink.Status.BACKOFF, sink.process());
+    Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
 
     sink.stop();
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
@@ -151,19 +151,19 @@ public class TestAvroSink {
     transaction.close();
 
     for (int i = 0; i < 5; i++) {
-      PollableSink.Status status = sink.process();
-      Assert.assertEquals(PollableSink.Status.BACKOFF, status);
+      Sink.Status status = sink.process();
+      Assert.assertEquals(Sink.Status.BACKOFF, status);
     }
 
     server = createServer();
     server.start();
 
     for (int i = 0; i < 5; i++) {
-      PollableSink.Status status = sink.process();
-      Assert.assertEquals(PollableSink.Status.READY, status);
+      Sink.Status status = sink.process();
+      Assert.assertEquals(Sink.Status.READY, status);
     }
 
-    Assert.assertEquals(PollableSink.Status.BACKOFF, sink.process());
+    Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
 
     sink.stop();
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java Fri Feb  3 19:46:15 2012
@@ -295,7 +295,7 @@ public class PropertiesFileConfiguration
       sink.setChannel(conf.getChannels().get(
           componentConfig.get("channel")));
       conf.getSinkRunners().put(comp.getComponentName(),
-          SinkRunner.forSink(sink));
+          new SinkRunner(sink));
     }
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java Fri Feb  3 19:46:15 2012
@@ -162,7 +162,7 @@ public class TestAbstractLogicalNodeMana
     nullSink.setChannel(channel);
 
     nodeManager.add(SourceRunner.forSource(generatorSource));
-    nodeManager.add(SinkRunner.forSink(nullSink));
+    nodeManager.add(new SinkRunner(nullSink));
 
     nodeManager.start();
     boolean reached = LifecycleController.waitForOneOf(nodeManager,
@@ -198,7 +198,7 @@ public class TestAbstractLogicalNodeMana
     sink.setChannel(channel);
 
     nodeManager.add(SourceRunner.forSource(source));
-    nodeManager.add(SinkRunner.forSink(sink));
+    nodeManager.add(new SinkRunner(sink));
 
     for (int i = 0; i < 10; i++) {
       nodeManager.start();

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Fri Feb  3 19:46:15 2012
@@ -38,7 +38,6 @@ import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
@@ -52,8 +51,7 @@ import org.apache.hadoop.io.compress.Com
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HDFSEventSink extends AbstractSink implements PollableSink,
-    Configurable {
+public class HDFSEventSink extends AbstractSink implements Configurable {
   private static final Logger LOG = LoggerFactory
       .getLogger(HDFSEventSink.class);
 

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Fri Feb  3 19:46:15 2012
@@ -21,20 +21,17 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Calendar;
-import java.util.LinkedList;
-import java.util.List;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink.Status;
+import org.apache.flume.Sink.Status;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java Fri Feb  3 19:46:15 2012
@@ -25,7 +25,6 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.sink.AbstractSink;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
-public class IRCSink extends AbstractSink implements PollableSink, Configurable {
+public class IRCSink extends AbstractSink implements Configurable {
 
   private static final Logger logger = LoggerFactory.getLogger(IRCSink.class);