You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/25 20:50:02 UTC

svn commit: r1161689 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java

Author: esammer
Date: Thu Aug 25 18:50:02 2011
New Revision: 1161689

URL: http://svn.apache.org/viewvc?rev=1161689&view=rev
Log:
- Made FileBasedWALWriter internally roll-ish so WALWriter consumers can be dumb about batching.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java?rev=1161689&r1=1161688&r2=1161689&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java Thu Aug 25 18:50:02 2011
@@ -2,46 +2,86 @@ package org.apache.flume.durability.file
 
 import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.apache.flume.Event;
 import org.apache.flume.durability.WALWriter;
 import org.apache.flume.formatter.output.EventFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
 
 public class FileBasedWALWriter implements WALWriter {
 
-  private File file;
-  private BufferedOutputStream outputStream;
+  private static final Logger logger = LoggerFactory
+      .getLogger(FileBasedWALWriter.class);
+  private static final long defaultRollInterval = 30000;
+
+  private File directory;
   private EventFormatter formatter;
+  private volatile long rollInterval;
+
+  private File currentFile;
+  private BufferedOutputStream outputStream;
+  private volatile long openTime;
+  private volatile boolean shouldRoll;
+
+  public FileBasedWALWriter() {
+    shouldRoll = false;
+    rollInterval = defaultRollInterval;
+  }
 
   @Override
-  public void open() throws FileNotFoundException {
-    outputStream = new BufferedOutputStream(new FileOutputStream(file));
+  public void open() throws IOException {
+    Preconditions.checkState(directory != null,
+        "Directory must be configured prior to opening.");
+
+    logger.debug("Opening WAL {}", directory);
+
+    long now = System.currentTimeMillis();
+
+    currentFile = new File(directory, String.valueOf(now) + "-"
+        + Thread.currentThread().getId());
+    outputStream = new BufferedOutputStream(new FileOutputStream(currentFile));
+    openTime = now;
   }
 
   @Override
   public void write(Event event) throws IOException {
+    if (shouldRoll) {
+      close();
+      shouldRoll = false;
+    }
+
+    if (outputStream == null) {
+      open();
+    }
+
     outputStream.write(formatter.format(event));
   }
 
   @Override
   public void close() throws IOException {
     outputStream.close();
+    outputStream = null;
   }
 
   @Override
   public void flush() throws IOException {
+    Preconditions.checkState(outputStream != null,
+        "Attempt to flush an unopen WAL: %s", currentFile);
+
     outputStream.flush();
   }
 
   public File getFile() {
-    return file;
+    return directory;
   }
 
   public void setFile(File file) {
-    this.file = file;
+    this.directory = file;
   }
 
   public EventFormatter getFormatter() {
@@ -52,4 +92,18 @@ public class FileBasedWALWriter implemen
     this.formatter = formatter;
   }
 
+  public class RollCheckRunnable implements Runnable {
+
+    @Override
+    public void run() {
+      long now = System.currentTimeMillis();
+
+      if (openTime + rollInterval >= now) {
+        logger.debug("Marking time to roll");
+
+        shouldRoll = true;
+      }
+    }
+  }
+
 }