You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/25 23:57:27 UTC

svn commit: r1161772 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/sink/RollingFileSink.java test/java/org/apache/flume/sink/TestRollingFileSink.java

Author: esammer
Date: Thu Aug 25 21:57:27 2011
New Revision: 1161772

URL: http://svn.apache.org/viewvc?rev=1161772&view=rev
Log:
- RollingFileSink now implements Configurable. Updated tests.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1161772&r1=1161771&r2=1161772&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Thu Aug 25 21:57:27 2011
@@ -13,6 +13,7 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.EventFormatter;
 import org.apache.flume.formatter.output.PathManager;
 import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
@@ -20,9 +21,10 @@ import org.apache.flume.lifecycle.Lifecy
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-public class RollingFileSink extends AbstractEventSink {
+public class RollingFileSink extends AbstractEventSink implements Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(RollingFileSink.class);
@@ -44,7 +46,21 @@ public class RollingFileSink extends Abs
     counterGroup = new CounterGroup();
     pathController = new PathManager();
     shouldRotate = false;
-    rollInterval = defaultRollInterval;
+  }
+
+  @Override
+  public void configure(Context context) {
+    File directory = context.get("sink.directory", File.class);
+    Long rollInterval = context.get("sink.rollInterval", Long.class);
+
+    Preconditions.checkArgument(directory != null, "Directory may not be null");
+
+    if (rollInterval == null) {
+      rollInterval = defaultRollInterval;
+    }
+
+    this.rollInterval = rollInterval;
+    this.directory = directory;
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java?rev=1161772&r1=1161771&r2=1161772&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java Thu Aug 25 21:57:27 2011
@@ -8,6 +8,7 @@ import java.io.IOException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.junit.After;
@@ -32,8 +33,6 @@ public class TestRollingFileSink {
     sink = new RollingFileSink();
 
     tmpDir.mkdirs();
-
-    sink.setDirectory(tmpDir);
   }
 
   @After
@@ -45,6 +44,10 @@ public class TestRollingFileSink {
   public void testLifecycle() throws InterruptedException, LifecycleException {
     Context context = new Context();
 
+    context.put("sink.directory", tmpDir);
+
+    Configurables.configure(sink, context);
+
     sink.open(context);
     sink.close(context);
   }
@@ -55,7 +58,10 @@ public class TestRollingFileSink {
 
     Context context = new Context();
 
-    sink.setRollInterval(1);
+    context.put("sink.directory", tmpDir);
+    context.put("sink.rollInterval", 1L);
+
+    Configurables.configure(sink, context);
 
     sink.open(context);