You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Jonathan Hsieh <jo...@cloudera.com> on 2011/08/23 15:00:57 UTC

Name the package wal? [Re: svn commit: r1160466 - in /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability: ./ FileBasedWAL.java FileBasedWALManager.java]

this was something I wanted to rename -- maybe make it org.apache.flume.wal?

Jon.

On Mon, Aug 22, 2011 at 3:04 PM, <es...@apache.org> wrote:

> Author: esammer
> Date: Mon Aug 22 22:04:00 2011
> New Revision: 1160466
>
> URL: http://svn.apache.org/viewvc?rev=1160466&view=rev
> Log:
> - Skeleton of a file-based WAL implementation. Still fuzzy.
>
> Added:
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
>
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
>
> Added:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java?rev=1160466&view=auto
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> (added)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> Mon Aug 22 22:04:00 2011
> @@ -0,0 +1,129 @@
> +package org.apache.flume.durability;
> +
> +import java.io.File;
> +import java.io.IOException;
> +
> +import org.apache.flume.durability.FileBasedWALManager.FileBasedWALWriter;
> +import org.apache.flume.formatter.output.EventFormatter;
> +import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +import com.google.common.base.Preconditions;
> +
> +public class FileBasedWAL {
> +
> +  private static final Logger logger = LoggerFactory
> +      .getLogger(FileBasedWAL.class);
> +
> +  private File baseDirectory;
> +  private EventFormatter formatter;
> +
> +  private File openDirectory;
> +  private File pendingDirectory;
> +  private File sentDirectory;
> +  private File completeDirectory;
> +  private boolean isInitialized;
> +
> +  public FileBasedWAL(File baseDirectory) {
> +    Preconditions.checkNotNull(baseDirectory,
> +        "WAL base directory may not be null");
> +
> +    this.baseDirectory = baseDirectory;
> +
> +    openDirectory = new File(baseDirectory, "open");
> +    pendingDirectory = new File(baseDirectory, "pending");
> +    sentDirectory = new File(baseDirectory, "sent");
> +    completeDirectory = new File(baseDirectory, "complete");
> +
> +    /* FIXME: This shouldn't be hardcoded (and shouldn't be text!). */
> +    formatter = new TextDelimitedOutputFormatter();
> +
> +    isInitialized = false;
> +  }
> +
> +  public void initialize() throws IOException {
> +    logger.info("Initializing file-based WAL at {}", baseDirectory);
> +
> +    /*
> +     * NB: We purposefully check pathological (hey, that's a pun!) error
> cases
> +     * to improve error messages. Resist the urge to condense these
> checks.
> +     * Resist the urge to just use mkdirs(); it could potentially expose
> +     * sensitive data (i.e. path creation / permission setting races).
> +     */
> +    File parentDirectory = baseDirectory.getParentFile();
> +
> +    Preconditions.checkState(parentDirectory.exists(), "WAL parent
> directory ("
> +        + parentDirectory + ") does not exist");
> +    Preconditions.checkState(parentDirectory.isDirectory(), "WAL parent ("
> +        + parentDirectory + " ) is not a directory");
> +
> +    if (!baseDirectory.exists()) {
> +      if (!baseDirectory.mkdir()) {
> +        throw new IOException("Unable to create WAL base directory "
> +            + baseDirectory);
> +      }
> +    } else {
> +      Preconditions.checkState(baseDirectory.isDirectory(),
> +          "WAL base directory " + baseDirectory
> +              + " exists but it isn't a directory");
> +    }
> +
> +    openDirectory.mkdir();
> +    Preconditions.checkState(openDirectory.exists(),
> +        "Directory doesn't exist: %s", openDirectory);
> +
> +    pendingDirectory.mkdir();
> +    Preconditions.checkState(pendingDirectory.exists(),
> +        "Directory doesn't exist: %s", pendingDirectory);
> +
> +    sentDirectory.mkdir();
> +    Preconditions.checkState(sentDirectory.exists(),
> +        "Directory doesn't exist: %s", sentDirectory);
> +
> +    completeDirectory.mkdir();
> +    Preconditions.checkState(completeDirectory.exists(),
> +        "Directory doesn't exist: %s", completeDirectory);
> +  }
> +
> +  public FileBasedWALWriter getWriter() throws IOException {
> +    FileBasedWALWriter writer = new FileBasedWALWriter();
> +
> +    writer.setFormatter(formatter);
> +
> +    if (!isInitialized) {
> +      initialize();
> +    }
> +
> +    return writer;
> +  }
> +
> +  @Override
> +  public String toString() {
> +    return "{ baseDirectory:" + baseDirectory + " openDirectory:"
> +        + openDirectory + " pendingDirectory:" + pendingDirectory
> +        + " sentDirectory:" + sentDirectory + " completeDirectory:"
> +        + completeDirectory + " isInitialized:" + isInitialized + " }";
> +  }
> +
> +  public File getBaseDirectory() {
> +    return baseDirectory;
> +  }
> +
> +  public File getOpenDirectory() {
> +    return openDirectory;
> +  }
> +
> +  public File getPendingDirectory() {
> +    return pendingDirectory;
> +  }
> +
> +  public File getSentDirectory() {
> +    return sentDirectory;
> +  }
> +
> +  public File getCompleteDirectory() {
> +    return completeDirectory;
> +  }
> +
> +}
>
> Added:
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> URL:
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java?rev=1160466&view=auto
>
> ==============================================================================
> ---
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> (added)
> +++
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> Mon Aug 22 22:04:00 2011
> @@ -0,0 +1,76 @@
> +package org.apache.flume.durability;
> +
> +import java.io.BufferedOutputStream;
> +import java.io.File;
> +import java.io.FileNotFoundException;
> +import java.io.FileOutputStream;
> +import java.io.IOException;
> +
> +import org.apache.flume.Event;
> +import org.apache.flume.formatter.output.EventFormatter;
> +
> +public class FileBasedWALManager {
> +
> +  private File directory;
> +
> +  public FileBasedWAL getWAL(String name) {
> +    File walDirectory = new File(directory, name);
> +    FileBasedWAL wal = new FileBasedWAL(walDirectory);
> +
> +    return wal;
> +  }
> +
> +  @Override
> +  public String toString() {
> +    return "{ directory:" + directory + " }";
> +  }
> +
> +  public File getDirectory() {
> +    return directory;
> +  }
> +
> +  public void setDirectory(File directory) {
> +    this.directory = directory;
> +  }
> +
> +  public static class FileBasedWALWriter {
> +
> +    private File file;
> +    private BufferedOutputStream outputStream;
> +    private EventFormatter formatter;
> +
> +    public void open() throws FileNotFoundException {
> +      outputStream = new BufferedOutputStream(new FileOutputStream(file));
> +    }
> +
> +    public void write(Event event) throws IOException {
> +      outputStream.write(formatter.format(event));
> +    }
> +
> +    public void close() throws IOException {
> +      outputStream.close();
> +    }
> +
> +    public void flush() throws IOException {
> +      outputStream.flush();
> +    }
> +
> +    public File getFile() {
> +      return file;
> +    }
> +
> +    public void setFile(File file) {
> +      this.file = file;
> +    }
> +
> +    public EventFormatter getFormatter() {
> +      return formatter;
> +    }
> +
> +    public void setFormatter(EventFormatter formatter) {
> +      this.formatter = formatter;
> +    }
> +
> +  }
> +
> +}
>
>
>


-- 
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// jon@cloudera.com

Re: Name the package wal? [Re: svn commit: r1160466 - in /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability: ./ FileBasedWAL.java FileBasedWALManager.java]

Posted by Eric Sammer <es...@cloudera.com>.
Sure. Makes sense.

On Tue, Aug 23, 2011 at 6:00 AM, Jonathan Hsieh <jo...@cloudera.com> wrote:

> this was something I wanted to rename -- maybe make it
> org.apache.flume.wal?
>
> Jon.
>
> On Mon, Aug 22, 2011 at 3:04 PM, <es...@apache.org> wrote:
>
> > Author: esammer
> > Date: Mon Aug 22 22:04:00 2011
> > New Revision: 1160466
> >
> > URL: http://svn.apache.org/viewvc?rev=1160466&view=rev
> > Log:
> > - Skeleton of a file-based WAL implementation. Still fuzzy.
> >
> > Added:
> >
> >
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/
> >
> >
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> >
> >
>  incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> >
> > Added:
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> > URL:
> >
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java?rev=1160466&view=auto
> >
> >
> ==============================================================================
> > ---
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> > (added)
> > +++
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWAL.java
> > Mon Aug 22 22:04:00 2011
> > @@ -0,0 +1,129 @@
> > +package org.apache.flume.durability;
> > +
> > +import java.io.File;
> > +import java.io.IOException;
> > +
> > +import
> org.apache.flume.durability.FileBasedWALManager.FileBasedWALWriter;
> > +import org.apache.flume.formatter.output.EventFormatter;
> > +import org.apache.flume.formatter.output.TextDelimitedOutputFormatter;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> > +
> > +import com.google.common.base.Preconditions;
> > +
> > +public class FileBasedWAL {
> > +
> > +  private static final Logger logger = LoggerFactory
> > +      .getLogger(FileBasedWAL.class);
> > +
> > +  private File baseDirectory;
> > +  private EventFormatter formatter;
> > +
> > +  private File openDirectory;
> > +  private File pendingDirectory;
> > +  private File sentDirectory;
> > +  private File completeDirectory;
> > +  private boolean isInitialized;
> > +
> > +  public FileBasedWAL(File baseDirectory) {
> > +    Preconditions.checkNotNull(baseDirectory,
> > +        "WAL base directory may not be null");
> > +
> > +    this.baseDirectory = baseDirectory;
> > +
> > +    openDirectory = new File(baseDirectory, "open");
> > +    pendingDirectory = new File(baseDirectory, "pending");
> > +    sentDirectory = new File(baseDirectory, "sent");
> > +    completeDirectory = new File(baseDirectory, "complete");
> > +
> > +    /* FIXME: This shouldn't be hardcoded (and shouldn't be text!). */
> > +    formatter = new TextDelimitedOutputFormatter();
> > +
> > +    isInitialized = false;
> > +  }
> > +
> > +  public void initialize() throws IOException {
> > +    logger.info("Initializing file-based WAL at {}", baseDirectory);
> > +
> > +    /*
> > +     * NB: We purposefully check pathological (hey, that's a pun!) error
> > cases
> > +     * to improve error messages. Resist the urge to condense these
> > checks.
> > +     * Resist the urge to just use mkdirs(); it could potentially expose
> > +     * sensitive data (i.e. path creation / permission setting races).
> > +     */
> > +    File parentDirectory = baseDirectory.getParentFile();
> > +
> > +    Preconditions.checkState(parentDirectory.exists(), "WAL parent
> > directory ("
> > +        + parentDirectory + ") does not exist");
> > +    Preconditions.checkState(parentDirectory.isDirectory(), "WAL parent
> ("
> > +        + parentDirectory + " ) is not a directory");
> > +
> > +    if (!baseDirectory.exists()) {
> > +      if (!baseDirectory.mkdir()) {
> > +        throw new IOException("Unable to create WAL base directory "
> > +            + baseDirectory);
> > +      }
> > +    } else {
> > +      Preconditions.checkState(baseDirectory.isDirectory(),
> > +          "WAL base directory " + baseDirectory
> > +              + " exists but it isn't a directory");
> > +    }
> > +
> > +    openDirectory.mkdir();
> > +    Preconditions.checkState(openDirectory.exists(),
> > +        "Directory doesn't exist: %s", openDirectory);
> > +
> > +    pendingDirectory.mkdir();
> > +    Preconditions.checkState(pendingDirectory.exists(),
> > +        "Directory doesn't exist: %s", pendingDirectory);
> > +
> > +    sentDirectory.mkdir();
> > +    Preconditions.checkState(sentDirectory.exists(),
> > +        "Directory doesn't exist: %s", sentDirectory);
> > +
> > +    completeDirectory.mkdir();
> > +    Preconditions.checkState(completeDirectory.exists(),
> > +        "Directory doesn't exist: %s", completeDirectory);
> > +  }
> > +
> > +  public FileBasedWALWriter getWriter() throws IOException {
> > +    FileBasedWALWriter writer = new FileBasedWALWriter();
> > +
> > +    writer.setFormatter(formatter);
> > +
> > +    if (!isInitialized) {
> > +      initialize();
> > +    }
> > +
> > +    return writer;
> > +  }
> > +
> > +  @Override
> > +  public String toString() {
> > +    return "{ baseDirectory:" + baseDirectory + " openDirectory:"
> > +        + openDirectory + " pendingDirectory:" + pendingDirectory
> > +        + " sentDirectory:" + sentDirectory + " completeDirectory:"
> > +        + completeDirectory + " isInitialized:" + isInitialized + " }";
> > +  }
> > +
> > +  public File getBaseDirectory() {
> > +    return baseDirectory;
> > +  }
> > +
> > +  public File getOpenDirectory() {
> > +    return openDirectory;
> > +  }
> > +
> > +  public File getPendingDirectory() {
> > +    return pendingDirectory;
> > +  }
> > +
> > +  public File getSentDirectory() {
> > +    return sentDirectory;
> > +  }
> > +
> > +  public File getCompleteDirectory() {
> > +    return completeDirectory;
> > +  }
> > +
> > +}
> >
> > Added:
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> > URL:
> >
> http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java?rev=1160466&view=auto
> >
> >
> ==============================================================================
> > ---
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> > (added)
> > +++
> >
> incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/durability/FileBasedWALManager.java
> > Mon Aug 22 22:04:00 2011
> > @@ -0,0 +1,76 @@
> > +package org.apache.flume.durability;
> > +
> > +import java.io.BufferedOutputStream;
> > +import java.io.File;
> > +import java.io.FileNotFoundException;
> > +import java.io.FileOutputStream;
> > +import java.io.IOException;
> > +
> > +import org.apache.flume.Event;
> > +import org.apache.flume.formatter.output.EventFormatter;
> > +
> > +public class FileBasedWALManager {
> > +
> > +  private File directory;
> > +
> > +  public FileBasedWAL getWAL(String name) {
> > +    File walDirectory = new File(directory, name);
> > +    FileBasedWAL wal = new FileBasedWAL(walDirectory);
> > +
> > +    return wal;
> > +  }
> > +
> > +  @Override
> > +  public String toString() {
> > +    return "{ directory:" + directory + " }";
> > +  }
> > +
> > +  public File getDirectory() {
> > +    return directory;
> > +  }
> > +
> > +  public void setDirectory(File directory) {
> > +    this.directory = directory;
> > +  }
> > +
> > +  public static class FileBasedWALWriter {
> > +
> > +    private File file;
> > +    private BufferedOutputStream outputStream;
> > +    private EventFormatter formatter;
> > +
> > +    public void open() throws FileNotFoundException {
> > +      outputStream = new BufferedOutputStream(new
> FileOutputStream(file));
> > +    }
> > +
> > +    public void write(Event event) throws IOException {
> > +      outputStream.write(formatter.format(event));
> > +    }
> > +
> > +    public void close() throws IOException {
> > +      outputStream.close();
> > +    }
> > +
> > +    public void flush() throws IOException {
> > +      outputStream.flush();
> > +    }
> > +
> > +    public File getFile() {
> > +      return file;
> > +    }
> > +
> > +    public void setFile(File file) {
> > +      this.file = file;
> > +    }
> > +
> > +    public EventFormatter getFormatter() {
> > +      return formatter;
> > +    }
> > +
> > +    public void setFormatter(EventFormatter formatter) {
> > +      this.formatter = formatter;
> > +    }
> > +
> > +  }
> > +
> > +}
> >
> >
> >
>
>
> --
> // Jonathan Hsieh (shay)
> // Software Engineer, Cloudera
> // jon@cloudera.com
>



-- 
Eric Sammer
twitter: esammer
data: www.cloudera.com