You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/23 00:04:00 UTC

svn commit: r1160466 - in /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability: ./ FileBasedWAL.java FileBasedWALManager.java

Author: esammer
Date: Mon Aug 22 22:04:00 2011
New Revision: 1160466

URL: http://svn.apache.org/viewvc?rev=1160466&view=rev
Log:
- Skeleton of a file-based WAL implementation. Still fuzzy.

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java?rev=1160466&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java Mon Aug 22 22:04:00 2011
@@ -0,0 +1,129 @@
+package org.apache.flume.durability;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.flume.durability.FileBasedWALManager.FileBasedWALWriter;
+import org.apache.flume.formatter.output.EventFormatter;
+import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class FileBasedWAL {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(FileBasedWAL.class);
+
+  private File baseDirectory;
+  private EventFormatter formatter;
+
+  private File openDirectory;
+  private File pendingDirectory;
+  private File sentDirectory;
+  private File completeDirectory;
+  private boolean isInitialized;
+
+  public FileBasedWAL(File baseDirectory) {
+    Preconditions.checkNotNull(baseDirectory,
+        "WAL base directory may not be null");
+
+    this.baseDirectory = baseDirectory;
+
+    openDirectory = new File(baseDirectory, "open");
+    pendingDirectory = new File(baseDirectory, "pending");
+    sentDirectory = new File(baseDirectory, "sent");
+    completeDirectory = new File(baseDirectory, "complete");
+
+    /* FIXME: This shouldn't be hardcoded (and shouldn't be text!). */
+    formatter = new TextDelimitedOutputFormatter();
+
+    isInitialized = false;
+  }
+
+  public void initialize() throws IOException {
+    logger.info("Initializing file-based WAL at {}", baseDirectory);
+
+    /*
+     * NB: We purposefully check pathological (hey, that's a pun!) error cases
+     * to improve error messages. Resist the urge to condense these checks.
+     * Resist the urge to just use mkdirs(); it could potentially expose
+     * sensitive data (i.e. path creation / permission setting races).
+     */
+    File parentDirectory = baseDirectory.getParentFile();
+
+    Preconditions.checkState(parentDirectory.exists(), "WAL parent directory ("
+        + parentDirectory + ") does not exist");
+    Preconditions.checkState(parentDirectory.isDirectory(), "WAL parent ("
+        + parentDirectory + " ) is not a directory");
+
+    if (!baseDirectory.exists()) {
+      if (!baseDirectory.mkdir()) {
+        throw new IOException("Unable to create WAL base directory "
+            + baseDirectory);
+      }
+    } else {
+      Preconditions.checkState(baseDirectory.isDirectory(),
+          "WAL base directory " + baseDirectory
+              + " exists but it isn't a directory");
+    }
+
+    openDirectory.mkdir();
+    Preconditions.checkState(openDirectory.exists(),
+        "Directory doesn't exist: %s", openDirectory);
+
+    pendingDirectory.mkdir();
+    Preconditions.checkState(pendingDirectory.exists(),
+        "Directory doesn't exist: %s", pendingDirectory);
+
+    sentDirectory.mkdir();
+    Preconditions.checkState(sentDirectory.exists(),
+        "Directory doesn't exist: %s", sentDirectory);
+
+    completeDirectory.mkdir();
+    Preconditions.checkState(completeDirectory.exists(),
+        "Directory doesn't exist: %s", completeDirectory);
+  }
+
+  public FileBasedWALWriter getWriter() throws IOException {
+    FileBasedWALWriter writer = new FileBasedWALWriter();
+
+    writer.setFormatter(formatter);
+
+    if (!isInitialized) {
+      initialize();
+    }
+
+    return writer;
+  }
+
+  @Override
+  public String toString() {
+    return "{ baseDirectory:" + baseDirectory + " openDirectory:"
+        + openDirectory + " pendingDirectory:" + pendingDirectory
+        + " sentDirectory:" + sentDirectory + " completeDirectory:"
+        + completeDirectory + " isInitialized:" + isInitialized + " }";
+  }
+
+  public File getBaseDirectory() {
+    return baseDirectory;
+  }
+
+  public File getOpenDirectory() {
+    return openDirectory;
+  }
+
+  public File getPendingDirectory() {
+    return pendingDirectory;
+  }
+
+  public File getSentDirectory() {
+    return sentDirectory;
+  }
+
+  public File getCompleteDirectory() {
+    return completeDirectory;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java?rev=1160466&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java Mon Aug 22 22:04:00 2011
@@ -0,0 +1,76 @@
+package org.apache.flume.durability;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.formatter.output.EventFormatter;
+
+public class FileBasedWALManager {
+
+  private File directory;
+
+  public FileBasedWAL getWAL(String name) {
+    File walDirectory = new File(directory, name);
+    FileBasedWAL wal = new FileBasedWAL(walDirectory);
+
+    return wal;
+  }
+
+  @Override
+  public String toString() {
+    return "{ directory:" + directory + " }";
+  }
+
+  public File getDirectory() {
+    return directory;
+  }
+
+  public void setDirectory(File directory) {
+    this.directory = directory;
+  }
+
+  public static class FileBasedWALWriter {
+
+    private File file;
+    private BufferedOutputStream outputStream;
+    private EventFormatter formatter;
+
+    public void open() throws FileNotFoundException {
+      outputStream = new BufferedOutputStream(new FileOutputStream(file));
+    }
+
+    public void write(Event event) throws IOException {
+      outputStream.write(formatter.format(event));
+    }
+
+    public void close() throws IOException {
+      outputStream.close();
+    }
+
+    public void flush() throws IOException {
+      outputStream.flush();
+    }
+
+    public File getFile() {
+      return file;
+    }
+
+    public void setFile(File file) {
+      this.file = file;
+    }
+
+    public EventFormatter getFormatter() {
+      return formatter;
+    }
+
+    public void setFormatter(EventFormatter formatter) {
+      this.formatter = formatter;
+    }
+
+  }
+
+}