You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/25 20:50:02 UTC
svn commit: r1161689 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java
Author: esammer
Date: Thu Aug 25 18:50:02 2011
New Revision: 1161689
URL: http://svn.apache.org/viewvc?rev=1161689&view=rev
Log:
- Made FileBasedWALWriter internally roll-ish so WALWriter consumers can be dumb about batching.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java?rev=1161689&r1=1161688&r2=1161689&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/file/FileBasedWALWriter.java Thu Aug 25 18:50:02 2011
@@ -2,46 +2,86 @@ package org.apache.flume.durability.file
import java.io.BufferedOutputStream;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.flume.Event;
import org.apache.flume.durability.WALWriter;
import org.apache.flume.formatter.output.EventFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
public class FileBasedWALWriter implements WALWriter {
- private File file;
- private BufferedOutputStream outputStream;
+ private static final Logger logger = LoggerFactory
+ .getLogger(FileBasedWALWriter.class);
+ private static final long defaultRollInterval = 30000;
+
+ private File directory;
private EventFormatter formatter;
+ private volatile long rollInterval;
+
+ private File currentFile;
+ private BufferedOutputStream outputStream;
+ private volatile long openTime;
+ private volatile boolean shouldRoll;
+
+ public FileBasedWALWriter() {
+ shouldRoll = false;
+ rollInterval = defaultRollInterval;
+ }
@Override
- public void open() throws FileNotFoundException {
- outputStream = new BufferedOutputStream(new FileOutputStream(file));
+ public void open() throws IOException {
+ Preconditions.checkState(directory != null,
+ "Directory must be configured prior to opening.");
+
+ logger.debug("Opening WAL {}", directory);
+
+ long now = System.currentTimeMillis();
+
+ currentFile = new File(directory, String.valueOf(now) + "-"
+ + Thread.currentThread().getId());
+ outputStream = new BufferedOutputStream(new FileOutputStream(currentFile));
+ openTime = now;
}
@Override
public void write(Event event) throws IOException {
+ if (shouldRoll) {
+ close();
+ shouldRoll = false;
+ }
+
+ if (outputStream == null) {
+ open();
+ }
+
outputStream.write(formatter.format(event));
}
@Override
public void close() throws IOException {
outputStream.close();
+ outputStream = null;
}
@Override
public void flush() throws IOException {
+ Preconditions.checkState(outputStream != null,
+ "Attempt to flush an unopen WAL: %s", currentFile);
+
outputStream.flush();
}
public File getFile() {
- return file;
+ return directory;
}
public void setFile(File file) {
- this.file = file;
+ this.directory = file;
}
public EventFormatter getFormatter() {
@@ -52,4 +92,18 @@ public class FileBasedWALWriter implemen
this.formatter = formatter;
}
+ public class RollCheckRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ long now = System.currentTimeMillis();
+
+ if (openTime + rollInterval >= now) {
+ logger.debug("Marking time to roll");
+
+ shouldRoll = true;
+ }
+ }
+ }
+
}