You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/19 20:10:35 UTC

svn commit: r1159724 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/sink/RollingFileSink.java test/java/org/apache/flume/sink/TestRollingFileSink.java

Author: esammer
Date: Fri Aug 19 18:10:34 2011
New Revision: 1159724

URL: http://svn.apache.org/viewvc?rev=1159724&view=rev
Log:
- Added a sink that supports rolling file output. Extremely basic. Just an initial prototype to support more
  realistic testing / use cases during development.

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1159724&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Fri Aug 19 18:10:34 2011
@@ -0,0 +1,186 @@
+package org.apache.flume.sink;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.formatter.output.PathManager;
+import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class RollingFileSink extends AbstractEventSink {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(RollingFileSink.class);
+  private static final long defaultRollInterval = 30;
+
+  private File directory;
+  private long rollInterval;
+  private OutputStream outputStream;
+  private ScheduledExecutorService rollService;
+
+  private CounterGroup counterGroup;
+
+  private PathManager pathController;
+  private TextDelimitedOutputFormatter formatter;
+  private volatile boolean shouldRotate;
+
+  public RollingFileSink() {
+    formatter = new TextDelimitedOutputFormatter();
+    counterGroup = new CounterGroup();
+    pathController = new PathManager();
+    shouldRotate = false;
+    rollInterval = defaultRollInterval;
+  }
+
+  @Override
+  public void open(Context context) throws InterruptedException,
+      LifecycleException {
+
+    super.open(context);
+
+    pathController.setBaseDirectory(directory);
+
+    rollService = Executors.newScheduledThreadPool(
+        1,
+        new ThreadFactoryBuilder().setNameFormat(
+            "rollingFileSink-roller-" + Thread.currentThread().getId() + "-%d")
+            .build());
+
+    /*
+     * Every N seconds, mark that it's time to rotate. We purposefully do NOT
+     * touch anything other than the indicator flag to avoid error handling
+     * issues (e.g. IO exceptions occuring in two different threads. Resist the
+     * urge to actually perform rotation in a separate thread!
+     */
+    rollService.scheduleAtFixedRate(new Runnable() {
+
+      @Override
+      public void run() {
+        logger.debug("Marking time to rotate file {}",
+            pathController.getCurrentFile());
+        shouldRotate = true;
+      }
+
+    }, rollInterval, rollInterval, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void append(Context context, Event event) throws InterruptedException,
+      EventDeliveryException {
+
+    if (shouldRotate) {
+      logger.debug("Time to rotate {}", pathController.getCurrentFile());
+
+      if (outputStream != null) {
+        logger.debug("Closing file {}", pathController.getCurrentFile());
+
+        try {
+          outputStream.close();
+          shouldRotate = false;
+        } catch (IOException e) {
+          throw new EventDeliveryException("Unable to rotate file "
+              + pathController.getCurrentFile() + " while delivering event", e);
+        }
+
+        outputStream = null;
+        pathController.rotate();
+      }
+    }
+
+    if (outputStream == null) {
+      try {
+        logger.debug("Opening output stream for file {}",
+            pathController.getCurrentFile());
+
+        outputStream = new BufferedOutputStream(new FileOutputStream(
+            pathController.getCurrentFile()));
+      } catch (IOException e) {
+        throw new EventDeliveryException("Failed to open file "
+            + pathController.getCurrentFile() + " while delivering event", e);
+      }
+    }
+
+    try {
+      byte[] bytes = formatter.format(event);
+
+      /*
+       * FIXME: Feature: Rotate on size and time by checking bytes written and
+       * setting shouldRotate = true if we're past a threshold.
+       */
+      counterGroup.addAndGet("sink.bytesWritten", (long) bytes.length);
+
+      outputStream.write(bytes);
+
+      /*
+       * FIXME: Feature: Control flush interval based on time or number of
+       * events. For now, we're super-conservative and flush on each write.
+       */
+      outputStream.flush();
+    } catch (IOException e) {
+      throw new EventDeliveryException("Failed to write event:" + event, e);
+    }
+  }
+
+  @Override
+  public void close(Context context) throws InterruptedException,
+      LifecycleException {
+
+    super.close(context);
+
+    if (outputStream != null) {
+      logger.debug("Closing file {}", pathController.getCurrentFile());
+
+      try {
+        outputStream.flush();
+        outputStream.close();
+      } catch (IOException e) {
+        throw new LifecycleException("Unable to close output stream", e);
+      }
+    }
+
+    rollService.shutdown();
+
+    while (!rollService.isTerminated()) {
+      rollService.awaitTermination(1, TimeUnit.SECONDS);
+    }
+  }
+
+  public File getDirectory() {
+    return directory;
+  }
+
+  public void setDirectory(File directory) {
+    this.directory = directory;
+  }
+
+  public long getRollInterval() {
+    return rollInterval;
+  }
+
+  public void setRollInterval(long rollInterval) {
+    this.rollInterval = rollInterval;
+  }
+
+  public TextDelimitedOutputFormatter getFormatter() {
+    return formatter;
+  }
+
+  public void setFormatter(TextDelimitedOutputFormatter formatter) {
+    this.formatter = formatter;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java?rev=1159724&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java Fri Aug 19 18:10:34 2011
@@ -0,0 +1,74 @@
+package org.apache.flume.sink;
+
+import java.io.File;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestRollingFileSink {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestRollingFileSink.class);
+
+  private File tmpDir;
+  private RollingFileSink sink;
+
+  @Before
+  public void setUp() {
+    tmpDir = new File("/tmp/flume-rfs-" + System.currentTimeMillis() + "-"
+        + Thread.currentThread().getId());
+
+    sink = new RollingFileSink();
+
+    tmpDir.mkdirs();
+
+    sink.setDirectory(tmpDir);
+  }
+
+  @After
+  public void tearDown() {
+    tmpDir.delete();
+  }
+
+  @Test
+  public void testLifecycle() throws InterruptedException, LifecycleException {
+    Context context = new Context();
+
+    sink.open(context);
+    sink.close(context);
+  }
+
+  @Test
+  public void testAppend() throws InterruptedException, LifecycleException,
+      EventDeliveryException {
+
+    Context context = new Context();
+
+    sink.setRollInterval(1);
+
+    sink.open(context);
+
+    for (int i = 0; i < 10; i++) {
+      Event event = new SimpleEvent();
+
+      event.setBody(("Test event " + i).getBytes());
+      sink.append(context, event);
+
+      Thread.sleep(500);
+    }
+
+    sink.close(context);
+
+    for (String file : sink.getDirectory().list()) {
+      logger.debug("Produced file:{}", file);
+    }
+  }
+}