You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/25 23:57:27 UTC
svn commit: r1161772 - in
/incubator/flume/branches/flume-728/flume-ng-core/src:
main/java/org/apache/flume/sink/RollingFileSink.java
test/java/org/apache/flume/sink/TestRollingFileSink.java
Author: esammer
Date: Thu Aug 25 21:57:27 2011
New Revision: 1161772
URL: http://svn.apache.org/viewvc?rev=1161772&view=rev
Log:
- RollingFileSink now implements Configurable. Updated tests.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1161772&r1=1161771&r2=1161772&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Thu Aug 25 21:57:27 2011
@@ -13,6 +13,7 @@ import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.EventFormatter;
import org.apache.flume.formatter.output.PathManager;
import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
@@ -20,9 +21,10 @@ import org.apache.flume.lifecycle.Lifecy
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-public class RollingFileSink extends AbstractEventSink {
+public class RollingFileSink extends AbstractEventSink implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(RollingFileSink.class);
@@ -44,7 +46,21 @@ public class RollingFileSink extends Abs
counterGroup = new CounterGroup();
pathController = new PathManager();
shouldRotate = false;
- rollInterval = defaultRollInterval;
+ }
+
+ @Override
+ public void configure(Context context) {
+ File directory = context.get("sink.directory", File.class);
+ Long rollInterval = context.get("sink.rollInterval", Long.class);
+
+ Preconditions.checkArgument(directory != null, "Directory may not be null");
+
+ if (rollInterval == null) {
+ rollInterval = defaultRollInterval;
+ }
+
+ this.rollInterval = rollInterval;
+ this.directory = directory;
}
@Override
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java?rev=1161772&r1=1161771&r2=1161772&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java Thu Aug 25 21:57:27 2011
@@ -8,6 +8,7 @@ import java.io.IOException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.lifecycle.LifecycleException;
import org.junit.After;
@@ -32,8 +33,6 @@ public class TestRollingFileSink {
sink = new RollingFileSink();
tmpDir.mkdirs();
-
- sink.setDirectory(tmpDir);
}
@After
@@ -45,6 +44,10 @@ public class TestRollingFileSink {
public void testLifecycle() throws InterruptedException, LifecycleException {
Context context = new Context();
+ context.put("sink.directory", tmpDir);
+
+ Configurables.configure(sink, context);
+
sink.open(context);
sink.close(context);
}
@@ -55,7 +58,10 @@ public class TestRollingFileSink {
Context context = new Context();
- sink.setRollInterval(1);
+ context.put("sink.directory", tmpDir);
+ context.put("sink.rollInterval", 1L);
+
+ Configurables.configure(sink, context);
sink.open(context);