You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Jonathan Hsieh <jo...@cloudera.com> on 2011/08/23 15:00:57 UTC
Name the package wal? [Re: svn commit: r1160466 - in
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability:
./ FileBasedWAL.java FileBasedWALManager.java]
this was something I wanted to rename -- maybe make it org.apache.flume.wal?
Jon.
On Mon, Aug 22, 2011 at 3:04 PM, <es...@apache.org> wrote:
> Author: esammer
> Date: Mon Aug 22 22:04:00 2011
> New Revision: 1160466
>
> URL: http://svn.apache.org/viewvc?rev=1160466&view=rev
> Log:
> - Skeleton of a file-based WAL implementation. Still fuzzy.
>
> Added:
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
>
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
>
> Added:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java?rev=1160466&view=auto
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> (added)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> Mon Aug 22 22:04:00 2011
> @@ -0,0 +1,129 @@
> +package org.apache.flume.durability;
> +
> +import java.io.File;
> +import java.io.IOException;
> +
> +import org.apache.flume.durability.FileBasedWALManager.FileBasedWALWriter;
> +import org.apache.flume.formatter.output.EventFormatter;
> +import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +import com.google.common.base.Preconditions;
> +
> +public class FileBasedWAL {
> +
> + private static final Logger logger = LoggerFactory
> + .getLogger(FileBasedWAL.class);
> +
> + private File baseDirectory;
> + private EventFormatter formatter;
> +
> + private File openDirectory;
> + private File pendingDirectory;
> + private File sentDirectory;
> + private File completeDirectory;
> + private boolean isInitialized;
> +
> + public FileBasedWAL(File baseDirectory) {
> + Preconditions.checkNotNull(baseDirectory,
> + "WAL base directory may not be null");
> +
> + this.baseDirectory = baseDirectory;
> +
> + openDirectory = new File(baseDirectory, "open");
> + pendingDirectory = new File(baseDirectory, "pending");
> + sentDirectory = new File(baseDirectory, "sent");
> + completeDirectory = new File(baseDirectory, "complete");
> +
> + /* FIXME: This shouldn't be hardcoded (and shouldn't be text!). */
> + formatter = new TextDelimitedOutputFormatter();
> +
> + isInitialized = false;
> + }
> +
> + public void initialize() throws IOException {
> + logger.info("Initializing file-based WAL at {}", baseDirectory);
> +
> + /*
> + * NB: We purposefully check pathological (hey, that's a pun!) error
> cases
> + * to improve error messages. Resist the urge to condense these
> checks.
> + * Resist the urge to just use mkdirs(); it could potentially expose
> + * sensitive data (i.e. path creation / permission setting races).
> + */
> + File parentDirectory = baseDirectory.getParentFile();
> +
> + Preconditions.checkState(parentDirectory.exists(), "WAL parent
> directory ("
> + + parentDirectory + ") does not exist");
> + Preconditions.checkState(parentDirectory.isDirectory(), "WAL parent ("
> + + parentDirectory + " ) is not a directory");
> +
> + if (!baseDirectory.exists()) {
> + if (!baseDirectory.mkdir()) {
> + throw new IOException("Unable to create WAL base directory "
> + + baseDirectory);
> + }
> + } else {
> + Preconditions.checkState(baseDirectory.isDirectory(),
> + "WAL base directory " + baseDirectory
> + + " exists but it isn't a directory");
> + }
> +
> + openDirectory.mkdir();
> + Preconditions.checkState(openDirectory.exists(),
> + "Directory doesn't exist: %s", openDirectory);
> +
> + pendingDirectory.mkdir();
> + Preconditions.checkState(pendingDirectory.exists(),
> + "Directory doesn't exist: %s", pendingDirectory);
> +
> + sentDirectory.mkdir();
> + Preconditions.checkState(sentDirectory.exists(),
> + "Directory doesn't exist: %s", sentDirectory);
> +
> + completeDirectory.mkdir();
> + Preconditions.checkState(completeDirectory.exists(),
> + "Directory doesn't exist: %s", completeDirectory);
> + }
> +
> + public FileBasedWALWriter getWriter() throws IOException {
> + FileBasedWALWriter writer = new FileBasedWALWriter();
> +
> + writer.setFormatter(formatter);
> +
> + if (!isInitialized) {
> + initialize();
> + }
> +
> + return writer;
> + }
> +
> + @Override
> + public String toString() {
> + return "{ baseDirectory:" + baseDirectory + " openDirectory:"
> + + openDirectory + " pendingDirectory:" + pendingDirectory
> + + " sentDirectory:" + sentDirectory + " completeDirectory:"
> + + completeDirectory + " isInitialized:" + isInitialized + " }";
> + }
> +
> + public File getBaseDirectory() {
> + return baseDirectory;
> + }
> +
> + public File getOpenDirectory() {
> + return openDirectory;
> + }
> +
> + public File getPendingDirectory() {
> + return pendingDirectory;
> + }
> +
> + public File getSentDirectory() {
> + return sentDirectory;
> + }
> +
> + public File getCompleteDirectory() {
> + return completeDirectory;
> + }
> +
> +}
>
> Added:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java?rev=1160466&view=auto
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> (added)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> Mon Aug 22 22:04:00 2011
> @@ -0,0 +1,76 @@
> +package org.apache.flume.durability;
> +
> +import java.io.BufferedOutputStream;
> +import java.io.File;
> +import java.io.FileNotFoundException;
> +import java.io.FileOutputStream;
> +import java.io.IOException;
> +
> +import org.apache.flume.Event;
> +import org.apache.flume.formatter.output.EventFormatter;
> +
> +public class FileBasedWALManager {
> +
> + private File directory;
> +
> + public FileBasedWAL getWAL(String name) {
> + File walDirectory = new File(directory, name);
> + FileBasedWAL wal = new FileBasedWAL(walDirectory);
> +
> + return wal;
> + }
> +
> + @Override
> + public String toString() {
> + return "{ directory:" + directory + " }";
> + }
> +
> + public File getDirectory() {
> + return directory;
> + }
> +
> + public void setDirectory(File directory) {
> + this.directory = directory;
> + }
> +
> + public static class FileBasedWALWriter {
> +
> + private File file;
> + private BufferedOutputStream outputStream;
> + private EventFormatter formatter;
> +
> + public void open() throws FileNotFoundException {
> + outputStream = new BufferedOutputStream(new FileOutputStream(file));
> + }
> +
> + public void write(Event event) throws IOException {
> + outputStream.write(formatter.format(event));
> + }
> +
> + public void close() throws IOException {
> + outputStream.close();
> + }
> +
> + public void flush() throws IOException {
> + outputStream.flush();
> + }
> +
> + public File getFile() {
> + return file;
> + }
> +
> + public void setFile(File file) {
> + this.file = file;
> + }
> +
> + public EventFormatter getFormatter() {
> + return formatter;
> + }
> +
> + public void setFormatter(EventFormatter formatter) {
> + this.formatter = formatter;
> + }
> +
> + }
> +
> +}
>
>
>
--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// jon@cloudera.com
Re: Name the package wal? [Re: svn commit: r1160466 - in
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability:
./ FileBasedWAL.java FileBasedWALManager.java]
Posted by Eric Sammer <es...@cloudera.com>.
Sure. Makes sense.
On Tue, Aug 23, 2011 at 6:00 AM, Jonathan Hsieh <jo...@cloudera.com> wrote:
> this was something I wanted to rename -- maybe make it
> org.apache.flume.wal?
>
> Jon.
>
> On Mon, Aug 22, 2011 at 3:04 PM, <es...@apache.org> wrote:
>
> > Author: esammer
> > Date: Mon Aug 22 22:04:00 2011
> > New Revision: 1160466
> >
> > URL: http://svn.apache.org/viewvc?rev=1160466&view=rev
> > Log:
> > - Skeleton of a file-based WAL implementation. Still fuzzy.
> >
> > Added:
> >
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/
> >
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> >
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> >
> > Added:
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> > URL:
> >
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java?rev=1160466&view=auto
> >
> >
> ==============================================================================
> > ---
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> > (added)
> > +++
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> > Mon Aug 22 22:04:00 2011
> > @@ -0,0 +1,129 @@
> > +package org.apache.flume.durability;
> > +
> > +import java.io.File;
> > +import java.io.IOException;
> > +
> > +import
> org.apache.flume.durability.FileBasedWALManager.FileBasedWALWriter;
> > +import org.apache.flume.formatter.output.EventFormatter;
> > +import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> > +
> > +import com.google.common.base.Preconditions;
> > +
> > +public class FileBasedWAL {
> > +
> > + private static final Logger logger = LoggerFactory
> > + .getLogger(FileBasedWAL.class);
> > +
> > + private File baseDirectory;
> > + private EventFormatter formatter;
> > +
> > + private File openDirectory;
> > + private File pendingDirectory;
> > + private File sentDirectory;
> > + private File completeDirectory;
> > + private boolean isInitialized;
> > +
> > + public FileBasedWAL(File baseDirectory) {
> > + Preconditions.checkNotNull(baseDirectory,
> > + "WAL base directory may not be null");
> > +
> > + this.baseDirectory = baseDirectory;
> > +
> > + openDirectory = new File(baseDirectory, "open");
> > + pendingDirectory = new File(baseDirectory, "pending");
> > + sentDirectory = new File(baseDirectory, "sent");
> > + completeDirectory = new File(baseDirectory, "complete");
> > +
> > + /* FIXME: This shouldn't be hardcoded (and shouldn't be text!). */
> > + formatter = new TextDelimitedOutputFormatter();
> > +
> > + isInitialized = false;
> > + }
> > +
> > + public void initialize() throws IOException {
> > + logger.info("Initializing file-based WAL at {}", baseDirectory);
> > +
> > + /*
> > + * NB: We purposefully check pathological (hey, that's a pun!) error
> > cases
> > + * to improve error messages. Resist the urge to condense these
> > checks.
> > + * Resist the urge to just use mkdirs(); it could potentially expose
> > + * sensitive data (i.e. path creation / permission setting races).
> > + */
> > + File parentDirectory = baseDirectory.getParentFile();
> > +
> > + Preconditions.checkState(parentDirectory.exists(), "WAL parent
> > directory ("
> > + + parentDirectory + ") does not exist");
> > + Preconditions.checkState(parentDirectory.isDirectory(), "WAL parent
> ("
> > + + parentDirectory + " ) is not a directory");
> > +
> > + if (!baseDirectory.exists()) {
> > + if (!baseDirectory.mkdir()) {
> > + throw new IOException("Unable to create WAL base directory "
> > + + baseDirectory);
> > + }
> > + } else {
> > + Preconditions.checkState(baseDirectory.isDirectory(),
> > + "WAL base directory " + baseDirectory
> > + + " exists but it isn't a directory");
> > + }
> > +
> > + openDirectory.mkdir();
> > + Preconditions.checkState(openDirectory.exists(),
> > + "Directory doesn't exist: %s", openDirectory);
> > +
> > + pendingDirectory.mkdir();
> > + Preconditions.checkState(pendingDirectory.exists(),
> > + "Directory doesn't exist: %s", pendingDirectory);
> > +
> > + sentDirectory.mkdir();
> > + Preconditions.checkState(sentDirectory.exists(),
> > + "Directory doesn't exist: %s", sentDirectory);
> > +
> > + completeDirectory.mkdir();
> > + Preconditions.checkState(completeDirectory.exists(),
> > + "Directory doesn't exist: %s", completeDirectory);
> > + }
> > +
> > + public FileBasedWALWriter getWriter() throws IOException {
> > + FileBasedWALWriter writer = new FileBasedWALWriter();
> > +
> > + writer.setFormatter(formatter);
> > +
> > + if (!isInitialized) {
> > + initialize();
> > + }
> > +
> > + return writer;
> > + }
> > +
> > + @Override
> > + public String toString() {
> > + return "{ baseDirectory:" + baseDirectory + " openDirectory:"
> > + + openDirectory + " pendingDirectory:" + pendingDirectory
> > + + " sentDirectory:" + sentDirectory + " completeDirectory:"
> > + + completeDirectory + " isInitialized:" + isInitialized + " }";
> > + }
> > +
> > + public File getBaseDirectory() {
> > + return baseDirectory;
> > + }
> > +
> > + public File getOpenDirectory() {
> > + return openDirectory;
> > + }
> > +
> > + public File getPendingDirectory() {
> > + return pendingDirectory;
> > + }
> > +
> > + public File getSentDirectory() {
> > + return sentDirectory;
> > + }
> > +
> > + public File getCompleteDirectory() {
> > + return completeDirectory;
> > + }
> > +
> > +}
> >
> > Added:
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> > URL:
> >
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java?rev=1160466&view=auto
> >
> >
> ==============================================================================
> > ---
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> > (added)
> > +++
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> > Mon Aug 22 22:04:00 2011
> > @@ -0,0 +1,76 @@
> > +package org.apache.flume.durability;
> > +
> > +import java.io.BufferedOutputStream;
> > +import java.io.File;
> > +import java.io.FileNotFoundException;
> > +import java.io.FileOutputStream;
> > +import java.io.IOException;
> > +
> > +import org.apache.flume.Event;
> > +import org.apache.flume.formatter.output.EventFormatter;
> > +
> > +public class FileBasedWALManager {
> > +
> > + private File directory;
> > +
> > + public FileBasedWAL getWAL(String name) {
> > + File walDirectory = new File(directory, name);
> > + FileBasedWAL wal = new FileBasedWAL(walDirectory);
> > +
> > + return wal;
> > + }
> > +
> > + @Override
> > + public String toString() {
> > + return "{ directory:" + directory + " }";
> > + }
> > +
> > + public File getDirectory() {
> > + return directory;
> > + }
> > +
> > + public void setDirectory(File directory) {
> > + this.directory = directory;
> > + }
> > +
> > + public static class FileBasedWALWriter {
> > +
> > + private File file;
> > + private BufferedOutputStream outputStream;
> > + private EventFormatter formatter;
> > +
> > + public void open() throws FileNotFoundException {
> > + outputStream = new BufferedOutputStream(new
> FileOutputStream(file));
> > + }
> > +
> > + public void write(Event event) throws IOException {
> > + outputStream.write(formatter.format(event));
> > + }
> > +
> > + public void close() throws IOException {
> > + outputStream.close();
> > + }
> > +
> > + public void flush() throws IOException {
> > + outputStream.flush();
> > + }
> > +
> > + public File getFile() {
> > + return file;
> > + }
> > +
> > + public void setFile(File file) {
> > + this.file = file;
> > + }
> > +
> > + public EventFormatter getFormatter() {
> > + return formatter;
> > + }
> > +
> > + public void setFormatter(EventFormatter formatter) {
> > + this.formatter = formatter;
> > + }
> > +
> > + }
> > +
> > +}
> >
> >
> >
>
>
> --
> // Jonathan Hsieh (shay)
> // Software Engineer, Cloudera
> // jon@cloudera.com
>
--
Eric Sammer
twitter: esammer
data: www.cloudera.com