You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/19 20:10:35 UTC
svn commit: r1159724 - in
/incubator/flume/branches/flume-728/flume-ng-core/src:
main/java/org/apache/flume/sink/RollingFileSink.java
test/java/org/apache/flume/sink/TestRollingFileSink.java
Author: esammer
Date: Fri Aug 19 18:10:34 2011
New Revision: 1159724
URL: http://svn.apache.org/viewvc?rev=1159724&view=rev
Log:
- Added a sink that supports rolling file output. Extremely basic. Just an initial prototype to support more
realistic testing / use cases during development.
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1159724&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Fri Aug 19 18:10:34 2011
@@ -0,0 +1,186 @@
+package org.apache.flume.sink;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.formatter.output.PathManager;
+import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class RollingFileSink extends AbstractEventSink {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(RollingFileSink.class);
+ private static final long defaultRollInterval = 30;
+
+ private File directory;
+ private long rollInterval;
+ private OutputStream outputStream;
+ private ScheduledExecutorService rollService;
+
+ private CounterGroup counterGroup;
+
+ private PathManager pathController;
+ private TextDelimitedOutputFormatter formatter;
+ private volatile boolean shouldRotate;
+
+ public RollingFileSink() {
+ formatter = new TextDelimitedOutputFormatter();
+ counterGroup = new CounterGroup();
+ pathController = new PathManager();
+ shouldRotate = false;
+ rollInterval = defaultRollInterval;
+ }
+
+ @Override
+ public void open(Context context) throws InterruptedException,
+ LifecycleException {
+
+ super.open(context);
+
+ pathController.setBaseDirectory(directory);
+
+ rollService = Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder().setNameFormat(
+ "rollingFileSink-roller-" + Thread.currentThread().getId() + "-%d")
+ .build());
+
+ /*
+ * Every N seconds, mark that it's time to rotate. We purposefully do NOT
+ * touch anything other than the indicator flag to avoid error handling
+ * issues (e.g. IO exceptions occuring in two different threads. Resist the
+ * urge to actually perform rotation in a separate thread!
+ */
+ rollService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ logger.debug("Marking time to rotate file {}",
+ pathController.getCurrentFile());
+ shouldRotate = true;
+ }
+
+ }, rollInterval, rollInterval, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void append(Context context, Event event) throws InterruptedException,
+ EventDeliveryException {
+
+ if (shouldRotate) {
+ logger.debug("Time to rotate {}", pathController.getCurrentFile());
+
+ if (outputStream != null) {
+ logger.debug("Closing file {}", pathController.getCurrentFile());
+
+ try {
+ outputStream.close();
+ shouldRotate = false;
+ } catch (IOException e) {
+ throw new EventDeliveryException("Unable to rotate file "
+ + pathController.getCurrentFile() + " while delivering event", e);
+ }
+
+ outputStream = null;
+ pathController.rotate();
+ }
+ }
+
+ if (outputStream == null) {
+ try {
+ logger.debug("Opening output stream for file {}",
+ pathController.getCurrentFile());
+
+ outputStream = new BufferedOutputStream(new FileOutputStream(
+ pathController.getCurrentFile()));
+ } catch (IOException e) {
+ throw new EventDeliveryException("Failed to open file "
+ + pathController.getCurrentFile() + " while delivering event", e);
+ }
+ }
+
+ try {
+ byte[] bytes = formatter.format(event);
+
+ /*
+ * FIXME: Feature: Rotate on size and time by checking bytes written and
+ * setting shouldRotate = true if we're past a threshold.
+ */
+ counterGroup.addAndGet("sink.bytesWritten", (long) bytes.length);
+
+ outputStream.write(bytes);
+
+ /*
+ * FIXME: Feature: Control flush interval based on time or number of
+ * events. For now, we're super-conservative and flush on each write.
+ */
+ outputStream.flush();
+ } catch (IOException e) {
+ throw new EventDeliveryException("Failed to write event:" + event, e);
+ }
+ }
+
+ @Override
+ public void close(Context context) throws InterruptedException,
+ LifecycleException {
+
+ super.close(context);
+
+ if (outputStream != null) {
+ logger.debug("Closing file {}", pathController.getCurrentFile());
+
+ try {
+ outputStream.flush();
+ outputStream.close();
+ } catch (IOException e) {
+ throw new LifecycleException("Unable to close output stream", e);
+ }
+ }
+
+ rollService.shutdown();
+
+ while (!rollService.isTerminated()) {
+ rollService.awaitTermination(1, TimeUnit.SECONDS);
+ }
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public long getRollInterval() {
+ return rollInterval;
+ }
+
+ public void setRollInterval(long rollInterval) {
+ this.rollInterval = rollInterval;
+ }
+
+ public TextDelimitedOutputFormatter getFormatter() {
+ return formatter;
+ }
+
+ public void setFormatter(TextDelimitedOutputFormatter formatter) {
+ this.formatter = formatter;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java?rev=1159724&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java Fri Aug 19 18:10:34 2011
@@ -0,0 +1,74 @@
+package org.apache.flume.sink;
+
+import java.io.File;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestRollingFileSink {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestRollingFileSink.class);
+
+ private File tmpDir;
+ private RollingFileSink sink;
+
+ @Before
+ public void setUp() {
+ tmpDir = new File("/tmp/flume-rfs-" + System.currentTimeMillis() + "-"
+ + Thread.currentThread().getId());
+
+ sink = new RollingFileSink();
+
+ tmpDir.mkdirs();
+
+ sink.setDirectory(tmpDir);
+ }
+
+ @After
+ public void tearDown() {
+ tmpDir.delete();
+ }
+
+ @Test
+ public void testLifecycle() throws InterruptedException, LifecycleException {
+ Context context = new Context();
+
+ sink.open(context);
+ sink.close(context);
+ }
+
+ @Test
+ public void testAppend() throws InterruptedException, LifecycleException,
+ EventDeliveryException {
+
+ Context context = new Context();
+
+ sink.setRollInterval(1);
+
+ sink.open(context);
+
+ for (int i = 0; i < 10; i++) {
+ Event event = new SimpleEvent();
+
+ event.setBody(("Test event " + i).getBytes());
+ sink.append(context, event);
+
+ Thread.sleep(500);
+ }
+
+ sink.close(context);
+
+ for (String file : sink.getDirectory().list()) {
+ logger.debug("Produced file:{}", file);
+ }
+ }
+}