You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/02/03 20:46:16 UTC
svn commit: r1240313 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/main/java/org/apache/flume/
flume-ng-core/src/main/java/org/apache/flume/sink/
flume-ng-core/src/test/java/org/apache/flume/sink/
flume-ng-node/src/main/java/org/apache/fl...
Author: arvind
Date: Fri Feb 3 19:46:15 2012
New Revision: 1240313
URL: http://svn.apache.org/viewvc?rev=1240313&view=rev
Log:
FLUME-949. Collapse PollableSink into Sink interface.
(Jarcec Checho via Arvind Prabhakar)
Removed:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java Fri Feb 3 19:46:15 2012
@@ -27,4 +27,9 @@ public interface Sink extends LifecycleA
public Channel getChannel();
+ public Status process() throws EventDeliveryException;
+
+ public static enum Status {
+ READY, BACKOFF
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java Fri Feb 3 19:46:15 2012
@@ -19,25 +19,35 @@
package org.apache.flume;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.sink.PollableSinkRunner;
+import java.util.concurrent.atomic.AtomicBoolean;
-abstract public class SinkRunner implements LifecycleAware {
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- private Sink sink;
+public class SinkRunner implements LifecycleAware {
- public static SinkRunner forSink(Sink sink) {
- SinkRunner runner = null;
+ private static final Logger logger = LoggerFactory
+ .getLogger(SinkRunner.class);
+ private static final long backoffSleepIncrement = 1000;
+ private static final long maxBackoffSleep = 5000;
+
+ private CounterGroup counterGroup;
+ private PollingRunner runner;
+ private Thread runnerThread;
+ private LifecycleState lifecycleState;
- if (sink instanceof PollableSink) {
- runner = new PollableSinkRunner();
- ((PollableSinkRunner) runner).setSink((PollableSink) sink);
- } else {
- throw new IllegalArgumentException("No known runner type for sink "
- + sink);
- }
+ private Sink sink;
- return runner;
+ public SinkRunner() {
+ counterGroup = new CounterGroup();
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ public SinkRunner(Sink sink) {
+ this();
+ setSink(sink);
}
public Sink getSink() {
@@ -48,4 +58,92 @@ abstract public class SinkRunner impleme
this.sink = sink;
}
+ @Override
+ public void start() {
+ Sink sink = getSink();
+
+ sink.start();
+
+ runner = new PollingRunner();
+
+ runner.sink = sink;
+ runner.counterGroup = counterGroup;
+ runner.shouldStop = new AtomicBoolean();
+
+ runnerThread = new Thread(runner);
+ runnerThread.start();
+
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop() {
+
+ getSink().stop();
+
+ if (runnerThread != null) {
+ runner.shouldStop.set(true);
+ runnerThread.interrupt();
+
+ while (runnerThread.isAlive()) {
+ try {
+ logger.debug("Waiting for runner thread to exit");
+ runnerThread.join(500);
+ } catch (InterruptedException e) {
+ logger
+ .debug(
+ "Interrupted while waiting for runner thread to exit. Exception follows.",
+ e);
+ }
+ }
+ }
+
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public String toString() {
+ return "SinkRunner: { sink:" + getSink() + " counterGroup:"
+ + counterGroup + " }";
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ public static class PollingRunner implements Runnable {
+
+ private Sink sink;
+ private AtomicBoolean shouldStop;
+ private CounterGroup counterGroup;
+
+ @Override
+ public void run() {
+ logger.debug("Polling sink runner starting");
+
+ while (!shouldStop.get()) {
+ try {
+ if (sink.process().equals(Sink.Status.BACKOFF)) {
+ counterGroup.incrementAndGet("runner.backoffs");
+
+ Thread.sleep(Math.min(
+ counterGroup.incrementAndGet("runner.backoffs.consecutive")
+ * backoffSleepIncrement, maxBackoffSleep));
+ } else {
+ counterGroup.set("runner.backoffs.consecutive", 0L);
+ }
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while processing an event. Exiting.");
+ counterGroup.incrementAndGet("runner.interruptions");
+ } catch (EventDeliveryException e) {
+ logger.error("Unable to deliver event. Exception follows.", e);
+ counterGroup.incrementAndGet("runner.errors");
+ }
+ }
+
+ logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
+ }
+
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Fri Feb 3 19:46:15 2012
@@ -37,7 +37,6 @@ import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
@@ -111,8 +110,7 @@ import com.google.common.base.Preconditi
* TODO
* </p>
*/
-public class AvroSink extends AbstractSink implements PollableSink,
- Configurable {
+public class AvroSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
private static final Integer defaultBatchSize = 100;
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java Fri Feb 3 19:46:15 2012
@@ -20,7 +20,6 @@ package org.apache.flume.sink;
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.slf4j.Logger;
@@ -48,7 +47,7 @@ import org.slf4j.LoggerFactory;
* TODO
* </p>
*/
-public class LoggerSink extends AbstractSink implements PollableSink {
+public class LoggerSink extends AbstractSink {
private static final Logger logger = LoggerFactory
.getLogger(LoggerSink.class);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java Fri Feb 3 19:46:15 2012
@@ -21,7 +21,6 @@ import org.apache.flume.Channel;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.slf4j.Logger;
@@ -45,7 +44,7 @@ import org.slf4j.LoggerFactory;
* TODO
* </p>
*/
-public class NullSink extends AbstractSink implements PollableSink {
+public class NullSink extends AbstractSink {
private static final Logger logger = LoggerFactory.getLogger(NullSink.class);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Fri Feb 3 19:46:15 2012
@@ -31,7 +31,6 @@ import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.EventFormatter;
@@ -43,8 +42,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-public class RollingFileSink extends AbstractSink implements PollableSink,
- Configurable {
+public class RollingFileSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(RollingFileSink.class);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Fri Feb 3 19:46:15 2012
@@ -31,7 +31,7 @@ import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
+import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
@@ -113,11 +113,11 @@ public class TestAvroSink {
transaction.close();
for (int i = 0; i < 5; i++) {
- PollableSink.Status status = sink.process();
- Assert.assertEquals(PollableSink.Status.READY, status);
+ Sink.Status status = sink.process();
+ Assert.assertEquals(Sink.Status.READY, status);
}
- Assert.assertEquals(PollableSink.Status.BACKOFF, sink.process());
+ Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
sink.stop();
Assert.assertTrue(LifecycleController.waitForOneOf(sink,
@@ -151,19 +151,19 @@ public class TestAvroSink {
transaction.close();
for (int i = 0; i < 5; i++) {
- PollableSink.Status status = sink.process();
- Assert.assertEquals(PollableSink.Status.BACKOFF, status);
+ Sink.Status status = sink.process();
+ Assert.assertEquals(Sink.Status.BACKOFF, status);
}
server = createServer();
server.start();
for (int i = 0; i < 5; i++) {
- PollableSink.Status status = sink.process();
- Assert.assertEquals(PollableSink.Status.READY, status);
+ Sink.Status status = sink.process();
+ Assert.assertEquals(Sink.Status.READY, status);
}
- Assert.assertEquals(PollableSink.Status.BACKOFF, sink.process());
+ Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
sink.stop();
Assert.assertTrue(LifecycleController.waitForOneOf(sink,
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java Fri Feb 3 19:46:15 2012
@@ -295,7 +295,7 @@ public class PropertiesFileConfiguration
sink.setChannel(conf.getChannels().get(
componentConfig.get("channel")));
conf.getSinkRunners().put(comp.getComponentName(),
- SinkRunner.forSink(sink));
+ new SinkRunner(sink));
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java Fri Feb 3 19:46:15 2012
@@ -162,7 +162,7 @@ public class TestAbstractLogicalNodeMana
nullSink.setChannel(channel);
nodeManager.add(SourceRunner.forSource(generatorSource));
- nodeManager.add(SinkRunner.forSink(nullSink));
+ nodeManager.add(new SinkRunner(nullSink));
nodeManager.start();
boolean reached = LifecycleController.waitForOneOf(nodeManager,
@@ -198,7 +198,7 @@ public class TestAbstractLogicalNodeMana
sink.setChannel(channel);
nodeManager.add(SourceRunner.forSource(source));
- nodeManager.add(SinkRunner.forSink(sink));
+ nodeManager.add(new SinkRunner(sink));
for (int i = 0; i < 10; i++) {
nodeManager.start();
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Fri Feb 3 19:46:15 2012
@@ -38,7 +38,6 @@ import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
@@ -52,8 +51,7 @@ import org.apache.hadoop.io.compress.Com
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HDFSEventSink extends AbstractSink implements PollableSink,
- Configurable {
+public class HDFSEventSink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory
.getLogger(HDFSEventSink.class);
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Fri Feb 3 19:46:15 2012
@@ -21,20 +21,17 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Calendar;
-import java.util.LinkedList;
-import java.util.List;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink.Status;
+import org.apache.flume.Sink.Status;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.sink.FlumeFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java?rev=1240313&r1=1240312&r2=1240313&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java Fri Feb 3 19:46:15 2012
@@ -25,7 +25,6 @@ import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-public class IRCSink extends AbstractSink implements PollableSink, Configurable {
+public class IRCSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(IRCSink.class);