You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/23 00:04:00 UTC
svn commit: r1160466 - in
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability:
./ FileBasedWAL.java FileBasedWALManager.java
Author: esammer
Date: Mon Aug 22 22:04:00 2011
New Revision: 1160466
URL: http://svn.apache.org/viewvc?rev=1160466&view=rev
Log:
- Skeleton of a file-based WAL implementation. Still fuzzy.
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java?rev=1160466&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java Mon Aug 22 22:04:00 2011
@@ -0,0 +1,129 @@
+package org.apache.flume.durability;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.flume.durability.FileBasedWALManager.FileBasedWALWriter;
+import org.apache.flume.formatter.output.EventFormatter;
+import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class FileBasedWAL {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(FileBasedWAL.class);
+
+ private File baseDirectory;
+ private EventFormatter formatter;
+
+ private File openDirectory;
+ private File pendingDirectory;
+ private File sentDirectory;
+ private File completeDirectory;
+ private boolean isInitialized;
+
+ public FileBasedWAL(File baseDirectory) {
+ Preconditions.checkNotNull(baseDirectory,
+ "WAL base directory may not be null");
+
+ this.baseDirectory = baseDirectory;
+
+ openDirectory = new File(baseDirectory, "open");
+ pendingDirectory = new File(baseDirectory, "pending");
+ sentDirectory = new File(baseDirectory, "sent");
+ completeDirectory = new File(baseDirectory, "complete");
+
+ /* FIXME: This shouldn't be hardcoded (and shouldn't be text!). */
+ formatter = new TextDelimitedOutputFormatter();
+
+ isInitialized = false;
+ }
+
+ public void initialize() throws IOException {
+ logger.info("Initializing file-based WAL at {}", baseDirectory);
+
+ /*
+ * NB: We purposefully check pathological (hey, that's a pun!) error cases
+ * to improve error messages. Resist the urge to condense these checks.
+ * Resist the urge to just use mkdirs(); it could potentially expose
+ * sensitive data (i.e. path creation / permission setting races).
+ */
+ File parentDirectory = baseDirectory.getParentFile();
+
+ Preconditions.checkState(parentDirectory.exists(), "WAL parent directory ("
+ + parentDirectory + ") does not exist");
+ Preconditions.checkState(parentDirectory.isDirectory(), "WAL parent ("
+ + parentDirectory + " ) is not a directory");
+
+ if (!baseDirectory.exists()) {
+ if (!baseDirectory.mkdir()) {
+ throw new IOException("Unable to create WAL base directory "
+ + baseDirectory);
+ }
+ } else {
+ Preconditions.checkState(baseDirectory.isDirectory(),
+ "WAL base directory " + baseDirectory
+ + " exists but it isn't a directory");
+ }
+
+ openDirectory.mkdir();
+ Preconditions.checkState(openDirectory.exists(),
+ "Directory doesn't exist: %s", openDirectory);
+
+ pendingDirectory.mkdir();
+ Preconditions.checkState(pendingDirectory.exists(),
+ "Directory doesn't exist: %s", pendingDirectory);
+
+ sentDirectory.mkdir();
+ Preconditions.checkState(sentDirectory.exists(),
+ "Directory doesn't exist: %s", sentDirectory);
+
+ completeDirectory.mkdir();
+ Preconditions.checkState(completeDirectory.exists(),
+ "Directory doesn't exist: %s", completeDirectory);
+ }
+
+ public FileBasedWALWriter getWriter() throws IOException {
+ FileBasedWALWriter writer = new FileBasedWALWriter();
+
+ writer.setFormatter(formatter);
+
+ if (!isInitialized) {
+ initialize();
+ }
+
+ return writer;
+ }
+
+ @Override
+ public String toString() {
+ return "{ baseDirectory:" + baseDirectory + " openDirectory:"
+ + openDirectory + " pendingDirectory:" + pendingDirectory
+ + " sentDirectory:" + sentDirectory + " completeDirectory:"
+ + completeDirectory + " isInitialized:" + isInitialized + " }";
+ }
+
+ public File getBaseDirectory() {
+ return baseDirectory;
+ }
+
+ public File getOpenDirectory() {
+ return openDirectory;
+ }
+
+ public File getPendingDirectory() {
+ return pendingDirectory;
+ }
+
+ public File getSentDirectory() {
+ return sentDirectory;
+ }
+
+ public File getCompleteDirectory() {
+ return completeDirectory;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java?rev=1160466&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java Mon Aug 22 22:04:00 2011
@@ -0,0 +1,76 @@
+package org.apache.flume.durability;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.formatter.output.EventFormatter;
+
+public class FileBasedWALManager {
+
+ private File directory;
+
+ public FileBasedWAL getWAL(String name) {
+ File walDirectory = new File(directory, name);
+ FileBasedWAL wal = new FileBasedWAL(walDirectory);
+
+ return wal;
+ }
+
+ @Override
+ public String toString() {
+ return "{ directory:" + directory + " }";
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public static class FileBasedWALWriter {
+
+ private File file;
+ private BufferedOutputStream outputStream;
+ private EventFormatter formatter;
+
+ public void open() throws FileNotFoundException {
+ outputStream = new BufferedOutputStream(new FileOutputStream(file));
+ }
+
+ public void write(Event event) throws IOException {
+ outputStream.write(formatter.format(event));
+ }
+
+ public void close() throws IOException {
+ outputStream.close();
+ }
+
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public void setFile(File file) {
+ this.file = file;
+ }
+
+ public EventFormatter getFormatter() {
+ return formatter;
+ }
+
+ public void setFormatter(EventFormatter formatter) {
+ this.formatter = formatter;
+ }
+
+ }
+
+}