You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by do...@apache.org on 2016/04/07 18:53:52 UTC

[1/3] storm git commit: STORM-1464: storm-hdfs support for multiple output files and partitioning

Repository: storm
Updated Branches:
  refs/heads/master 4284b09c7 -> f48d7941b


STORM-1464: storm-hdfs support for multiple output files and partitioning


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7554fe20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7554fe20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7554fe20

Branch: refs/heads/master
Commit: 7554fe2042369141d374b149de40949557ee70ea
Parents: 500ef20
Author: Aaron Dossett <aa...@target.com>
Authored: Mon Mar 21 13:06:44 2016 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Mon Mar 21 13:06:44 2016 -0500

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   |  29 ++-
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       | 182 +++++++++++++------
 .../storm/hdfs/bolt/AvroGenericRecordBolt.java  |  75 +++-----
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    |  39 ++--
 .../storm/hdfs/bolt/SequenceFileBolt.java       |  35 ++--
 .../hdfs/bolt/rotation/FileRotationPolicy.java  |   5 +
 .../bolt/rotation/FileSizeRotationPolicy.java   |   8 +
 .../hdfs/bolt/rotation/NoRotationPolicy.java    |   5 +
 .../hdfs/bolt/rotation/TimedRotationPolicy.java |   8 +
 .../storm/hdfs/common/AbstractHDFSWriter.java   |  68 +++++++
 .../common/AvroGenericRecordHDFSWriter.java     |  80 ++++++++
 .../apache/storm/hdfs/common/HDFSWriter.java    |  66 +++++++
 .../storm/hdfs/common/NullPartitioner.java      |  31 ++++
 .../apache/storm/hdfs/common/Partitioner.java   |  36 ++++
 .../storm/hdfs/common/SequenceFileWriter.java   |  59 ++++++
 .../hdfs/bolt/AvroGenericRecordBoltTest.java    | 105 ++++++++---
 .../apache/storm/hdfs/bolt/TestHdfsBolt.java    |  34 +++-
 .../storm/hdfs/bolt/TestSequenceFileBolt.java   |   4 +-
 .../apache/storm/hdfs/bolt/TestWritersMap.java  |  48 +++++
 19 files changed, 730 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 2fc4c7b..3777481 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -184,6 +184,7 @@ Similar to sync policies, file rotation policies allow you to control when data
 public interface FileRotationPolicy extends Serializable {
     boolean mark(Tuple tuple, long offset);
     void reset();
+    FileRotationPolicy copy();
 }
 ``` 
 
@@ -240,6 +241,23 @@ If you are using Trident and sequence files you can do something like this:
                 .addRotationAction(new MoveFileAction().withDestination("/dest2/"));
 ```
 
+### Data Partitioning
+Data can be partitioned to different HDFS directories based on characteristics of the tuple being processed or purely
+external factors, such as system time.  To partition your your data, write a class that implements the ```Partitioner```
+interface and pass it to the withPartitioner() method of your bolt. The getPartitionPath() method returns a partition
+path for a given tuple.
+
+Here's an example of a Partitioner that operates on a specific field of data:
+
+```java
+
+    Partitioner partitoner = new Partitioner() {
+            @Override
+            public String getPartitionPath(Tuple tuple) {
+                return Path.SEPARATOR + tuple.getStringByField("city");
+            }
+     };
+```
 
 ## HDFS Bolt Support for HDFS Sequence Files
 
@@ -303,16 +321,15 @@ The `org.apache.storm.hdfs.bolt.AvroGenericRecordBolt` class allows you to write
         AvroGenericRecordBolt bolt = new AvroGenericRecordBolt()
                 .withFsUrl("hdfs://localhost:54310")
                 .withFileNameFormat(fileNameFormat)
-                .withSchemaAsString(schema)
                 .withRotationPolicy(rotationPolicy)
                 .withSyncPolicy(syncPolicy);
 ```
-The setup is very similar to the `SequenceFileBolt` example above.  The key difference is that instead of specifying a
-`SequenceFormat` you must provide a string representation of an Avro schema through the `withSchemaAsString()` method.
-An `org.apache.avro.Schema` object cannot be directly provided since it does not implement `Serializable`.
 
-The AvroGenericRecordBolt expects to receive tuples containing an Avro GenericRecord that conforms to the provided
-schema.
+The avro bolt will write records to separate files based on the schema of the record being processed.  In other words,
+if the bolt receives records with two different schemas, it will write to two separate files.  Each file will be rotatated
+in accordance with the specified rotation policy. If a large number of Avro schemas are expected, then the bolt should
+be configured with a maximum number of open files at least equal to the number of schemas expected to prevent excessive
+file open/close/create operations.
 
 To use this bolt you **must** register the appropriate Kryo serializers with your topology configuration.  A convenience
 method is provided for this:

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index c56f486..5daa0fa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -17,14 +17,12 @@
  */
 package org.apache.storm.hdfs.bolt;
 
-import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
-import org.apache.storm.utils.Utils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +30,9 @@ import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.hdfs.common.NullPartitioner;
+import org.apache.storm.hdfs.common.Partitioner;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
 import org.slf4j.Logger;
@@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.List;
@@ -52,15 +55,16 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
      * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
      */
     private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
+    private static final Integer DEFAULT_MAX_OPEN_FILES = 50;
 
-    protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
-    private Path currentFile;
+    protected Map<String, AbstractHDFSWriter> writers;
+    protected Map<String, Integer> rotationCounterMap = new HashMap<>();
+    protected List<RotationAction> rotationActions = new ArrayList<>();
     protected OutputCollector collector;
     protected transient FileSystem fs;
     protected SyncPolicy syncPolicy;
     protected FileRotationPolicy rotationPolicy;
     protected FileNameFormat fileNameFormat;
-    protected int rotation = 0;
     protected String fsUrl;
     protected String configKey;
     protected transient Object writeLock;
@@ -69,22 +73,21 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
     protected long offset = 0;
     protected Integer fileRetryCount = DEFAULT_RETRY_COUNT;
     protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
+    protected Integer maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
+    protected Partitioner partitioner = new NullPartitioner();
 
     protected transient Configuration hdfsConfig;
 
-    protected void rotateOutputFile() throws IOException {
+    protected void rotateOutputFile(AbstractHDFSWriter writer) throws IOException {
         LOG.info("Rotating output file...");
         long start = System.currentTimeMillis();
         synchronized (this.writeLock) {
-            closeOutputFile();
-            this.rotation++;
+            writer.close();
 
-            Path newFile = createOutputFile();
             LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
             for (RotationAction action : this.rotationActions) {
-                action.execute(this.fs, this.currentFile);
+                action.execute(this.fs, writer.getFilePath());
             }
-            this.currentFile = newFile;
         }
         long time = System.currentTimeMillis() - start;
         LOG.info("File rotation took {} ms.", time);
@@ -104,6 +107,8 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
             throw new IllegalStateException("File system URL must be specified.");
         }
 
+        writers = new WritersMap(this.maxOpenFiles);
+
         this.collector = collector;
         this.fileNameFormat.prepare(conf, topologyContext);
         this.hdfsConfig = new Configuration();
@@ -117,26 +122,12 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
         try{
             HdfsSecurityUtil.login(conf, hdfsConfig);
             doPrepare(conf, topologyContext, collector);
-            this.currentFile = createOutputFile();
-
         } catch (Exception e){
             throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e);
         }
 
         if(this.rotationPolicy instanceof TimedRotationPolicy){
-            long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
-            this.rotationTimer = new Timer(true);
-            TimerTask task = new TimerTask() {
-                @Override
-                public void run() {
-                    try {
-                        rotateOutputFile();
-                    } catch(IOException e){
-                        LOG.warn("IOException during scheduled file rotation.", e);
-                    }
-                }
-            };
-            this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+            startTimedRotationPolicy();
         }
     }
 
@@ -145,13 +136,20 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
 
         synchronized (this.writeLock) {
             boolean forceSync = false;
+            AbstractHDFSWriter writer = null;
+            String writerKey = null;
+
             if (TupleUtils.isTick(tuple)) {
                 LOG.debug("TICK! forcing a file system flush");
                 this.collector.ack(tuple);
                 forceSync = true;
             } else {
+
+                writerKey = getHashKeyForTuple(tuple);
+
                 try {
-                    writeTuple(tuple);
+                    writer = getOrCreateWriter(writerKey, tuple);
+                    this.offset = writer.write(tuple);
                     tupleBatch.add(tuple);
                 } catch (IOException e) {
                     //If the write failed, try to sync anything already written
@@ -171,7 +169,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
                 while (success == false && attempts < fileRetryCount) {
                     attempts += 1;
                     try {
-                        syncTuples();
+                        syncAllWriters();
                         LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
                         for (Tuple t : tupleBatch) {
                             this.collector.ack(t);
@@ -198,22 +196,53 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
                 }
             }
 
-            if(this.rotationPolicy.mark(tuple, this.offset)) {
-                try {
-                    rotateOutputFile();
-                    this.rotationPolicy.reset();
-                    this.offset = 0;
-                } catch (IOException e) {
-                    this.collector.reportError(e);
-                    LOG.warn("File could not be rotated");
-                    //At this point there is nothing to do.  In all likelihood any filesystem operations will fail.
-                    //The next tuple will almost certainly fail to write and/or sync, which force a rotation.  That
-                    //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
-                }
+            if (writer != null && writer.needsRotation()) {
+                    doRotationAndRemoveWriter(writerKey, writer);
             }
         }
     }
 
+    private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException {
+        AbstractHDFSWriter writer;
+
+        writer = writers.get(writerKey);
+        if (writer == null) {
+            Path pathForNextFile = getBasePathForNextFile(tuple);
+            writer = makeNewWriter(pathForNextFile, tuple);
+            writers.put(writerKey, writer);
+        }
+        return writer;
+    }
+
+    /**
+     * A tuple must be mapped to a writer based on two factors:
+     *  - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt
+     *    for an example of this)
+     *  - the directory the tuple will be partioned into
+     *
+     * @param tuple
+     * @return
+     */
+    private String getHashKeyForTuple(Tuple tuple) {
+        final String boltKey = getWriterKey(tuple);
+        final String partitionDir = this.partitioner.getPartitionPath(tuple);
+        return boltKey + "****" + partitionDir;
+    }
+
+    void doRotationAndRemoveWriter(String writerKey, AbstractHDFSWriter writer) {
+        try {
+            rotateOutputFile(writer);
+        } catch (IOException e) {
+            this.collector.reportError(e);
+            LOG.error("File could not be rotated");
+            //At this point there is nothing to do.  In all likelihood any filesystem operations will fail.
+            //The next tuple will almost certainly fail to write and/or sync, which force a rotation.  That
+            //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
+        } finally {
+            writers.remove(writerKey);
+        }
+    }
+
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), tickTupleInterval);
@@ -223,29 +252,64 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
     }
 
-    /**
-     * writes a tuple to the underlying filesystem but makes no guarantees about syncing data.
-     *
-     * this.offset is also updated to reflect additional data written
-     *
-     * @param tuple
-     * @throws IOException
-     */
-    abstract protected void writeTuple(Tuple tuple) throws IOException;
+    private void syncAllWriters() throws IOException {
+        for (AbstractHDFSWriter writer : writers.values()) {
+            writer.sync();
+        }
+    }
 
-    /**
-     * Make the best effort to sync written data to the underlying file system.  Concrete classes should very clearly
-     * state the file state that sync guarantees.  For example, HdfsBolt can make a much stronger guarantee than
-     * SequenceFileBolt.
-     *
-     * @throws IOException
-     */
-    abstract protected void syncTuples() throws IOException;
+    private void startTimedRotationPolicy() {
+        long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
+        this.rotationTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+            @Override
+            public void run() {
+                for (final AbstractHDFSWriter writer : writers.values()) {
+                    try {
+                        rotateOutputFile(writer);
+                    } catch (IOException e) {
+                        LOG.warn("IOException during scheduled file rotation.", e);
+                    }
+                }
+                writers.clear();
+            }
+        };
+        this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+    }
 
-    abstract protected void closeOutputFile() throws IOException;
+    protected Path getBasePathForNextFile(Tuple tuple) {
 
-    abstract protected Path createOutputFile() throws IOException;
+        final String partitionPath = this.partitioner.getPartitionPath(tuple);
+        final int rotation;
+        if (rotationCounterMap.containsKey(partitionPath))
+        {
+            rotation = rotationCounterMap.get(partitionPath) + 1;
+        } else {
+            rotation = 0;
+        }
+        rotationCounterMap.put(partitionPath, rotation);
+
+        return new Path(this.fsUrl + this.fileNameFormat.getPath() + partitionPath,
+                this.fileNameFormat.getName(rotation, System.currentTimeMillis()));
+    }
 
     abstract protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException;
 
+    abstract protected String getWriterKey(Tuple tuple);
+
+    abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException;
+
+    static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> {
+        final long maxWriters;
+
+        public WritersMap(long maxWriters) {
+            super((int)maxWriters, 0.75f, true);
+            this.maxWriters = maxWriters;
+        }
+
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<String, AbstractHDFSWriter> eldest) {
+            return this.size() > this.maxWriters;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
index cdeb2f8..e173d2a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
@@ -17,18 +17,16 @@
  */
 package org.apache.storm.hdfs.bolt;
 
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.hdfs.common.AvroGenericRecordHDFSWriter;
+import org.apache.storm.hdfs.common.Partitioner;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
@@ -38,24 +36,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
-import java.util.EnumSet;
 import java.util.Map;
 
 public class AvroGenericRecordBolt extends AbstractHdfsBolt{
 
     private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordBolt.class);
 
-    private transient FSDataOutputStream out;
-    private Schema schema;
-    private String schemaAsString;
-    private DataFileWriter<GenericRecord> avroWriter;
-
-    public AvroGenericRecordBolt withSchemaAsString(String schemaAsString)
-    {
-        this.schemaAsString = schemaAsString;
-        return this;
-    }
-
     public AvroGenericRecordBolt withFsUrl(String fsUrl){
         this.fsUrl = fsUrl;
         return this;
@@ -91,51 +77,36 @@ public class AvroGenericRecordBolt extends AbstractHdfsBolt{
         return this;
     }
 
-    @Override
-    protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
-        LOG.info("Preparing AvroGenericRecord Bolt...");
-        this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
-        Schema.Parser parser = new Schema.Parser();
-        this.schema = parser.parse(this.schemaAsString);
+    public AvroGenericRecordBolt withMaxOpenFiles(int maxOpenFiles) {
+        this.maxOpenFiles = maxOpenFiles;
+        return this;
     }
 
-    @Override
-    protected void writeTuple(Tuple tuple) throws IOException {
-        GenericRecord avroRecord = (GenericRecord) tuple.getValue(0);
-        avroWriter.append(avroRecord);
-        offset = this.out.getPos();
+    public AvroGenericRecordBolt withPartitioner(Partitioner partitioner) {
+        this.partitioner = partitioner;
+        return this;
     }
 
     @Override
-    protected void syncTuples() throws IOException {
-        avroWriter.flush();
-
-        LOG.debug("Attempting to sync all data to filesystem");
-        if (this.out instanceof HdfsDataOutputStream) {
-            ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-        } else {
-            this.out.hsync();
-        }
-        this.syncPolicy.reset();
+    protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
+        LOG.info("Preparing AvroGenericRecord Bolt...");
+        this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     }
 
+    /**
+     * AvroGenericRecordBolt must override this method because messages with different schemas cannot be written to the
+     * same file.  By treating the complete schema as the "key" AbstractHdfsBolt will associate a different writer for
+     * every distinct schema.
+     */
     @Override
-    protected void closeOutputFile() throws IOException
-    {
-        avroWriter.close();
-        this.out.close();
+    protected String getWriterKey(Tuple tuple) {
+        Schema recordSchema = ((GenericRecord) tuple.getValue(0)).getSchema();
+        return recordSchema.toString();
     }
 
     @Override
-    protected Path createOutputFile() throws IOException {
-        Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
-        this.out = this.fs.create(path);
-
-        //Initialize writer
-        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
-        avroWriter = new DataFileWriter<>(datumWriter);
-        avroWriter.create(this.schema, this.out);
-
-        return path;
+    protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException {
+        Schema recordSchema = ((GenericRecord) tuple.getValue(0)).getSchema();
+        return new AvroGenericRecordHDFSWriter(this.rotationPolicy, path, this.fs.create(path), recordSchema);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index 0299f43..614de6b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -23,12 +23,13 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.format.RecordFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.hdfs.common.HDFSWriter;
+import org.apache.storm.hdfs.common.Partitioner;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,38 +90,30 @@ public class HdfsBolt extends AbstractHdfsBolt{
         return this;
     }
 
-    @Override
-    public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
-        LOG.info("Preparing HDFS Bolt...");
-        this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
+    public HdfsBolt withPartitioner(Partitioner partitioner) {
+        this.partitioner = partitioner;
+        return this;
     }
 
-    @Override
-    protected void syncTuples() throws IOException {
-        LOG.debug("Attempting to sync all data to filesystem");
-        if (this.out instanceof HdfsDataOutputStream) {
-            ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-        } else {
-            this.out.hsync();
-        }
+    public HdfsBolt withMaxOpenFiles(int maxOpenFiles) {
+        this.maxOpenFiles = maxOpenFiles;
+        return this;
     }
 
     @Override
-    protected void writeTuple(Tuple tuple) throws IOException {
-        byte[] bytes = this.format.format(tuple);
-        out.write(bytes);
-        this.offset += bytes.length;
+    public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
+        LOG.info("Preparing HDFS Bolt...");
+        this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     }
 
     @Override
-    protected void closeOutputFile() throws IOException {
-        this.out.close();
+    protected String getWriterKey(Tuple tuple) {
+        return "CONSTANT";
     }
 
     @Override
-    protected Path createOutputFile() throws IOException {
-        Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
+    protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException {
         this.out = this.fs.create(path);
-        return path;
+        return new HDFSWriter(rotationPolicy,path, out, format);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index e0db7c9..3c78075 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -28,6 +28,9 @@ import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.format.SequenceFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.hdfs.common.Partitioner;
+import org.apache.storm.hdfs.common.SequenceFileWriter;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +107,16 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
         return this;
     }
 
+    public SequenceFileBolt withPartitioner(Partitioner partitioner) {
+        this.partitioner = partitioner;
+        return this;
+    }
+
+    public SequenceFileBolt withMaxOpenFiles(int maxOpenFiles) {
+        this.maxOpenFiles = maxOpenFiles;
+        return this;
+    }
+
     @Override
     public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing Sequence File Bolt...");
@@ -114,30 +127,20 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
     }
 
     @Override
-    protected void syncTuples() throws IOException {
-        LOG.debug("Attempting to sync all data to filesystem");
-        this.writer.hsync();
+    protected String getWriterKey(Tuple tuple) {
+        return "CONSTANT";
     }
 
     @Override
-    protected void writeTuple(Tuple tuple) throws IOException {
-        this.writer.append(this.format.key(tuple), this.format.value(tuple));
-        this.offset = this.writer.getLength();
-    }
-
-    protected Path createOutputFile() throws IOException {
-        Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
-        this.writer = SequenceFile.createWriter(
+    protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException {
+        SequenceFile.Writer writer = SequenceFile.createWriter(
                 this.hdfsConfig,
-                SequenceFile.Writer.file(p),
+                SequenceFile.Writer.file(path),
                 SequenceFile.Writer.keyClass(this.format.keyClass()),
                 SequenceFile.Writer.valueClass(this.format.valueClass()),
                 SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec))
         );
-        return p;
-    }
 
-    protected void closeOutputFile() throws IOException {
-        this.writer.close();
+        return new SequenceFileWriter(this.rotationPolicy, path, writer, this.format);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
index 90ef772..aeb63fa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
@@ -48,4 +48,9 @@ public interface FileRotationPolicy extends Serializable {
      *
      */
     void reset();
+
+    /**
+     * Must be able to copy the rotation policy
+     */
+    FileRotationPolicy copy();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
index f0df921..5fb9bbc 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
@@ -64,6 +64,10 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
         this.maxBytes = (long)(count * units.getByteCount());
     }
 
+    protected FileSizeRotationPolicy(long maxBytes) {
+        this.maxBytes = maxBytes;
+    }
+
     @Override
     public boolean mark(Tuple tuple, long offset) {
         long diff = offset - this.lastOffset;
@@ -78,4 +82,8 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
         this.lastOffset = 0;
     }
 
+    @Override
+    public FileRotationPolicy copy() {
+        return new FileSizeRotationPolicy(this.maxBytes);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java
index 14fa496..a00037b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java
@@ -32,4 +32,9 @@ public class NoRotationPolicy implements FileRotationPolicy {
     @Override
     public void reset() {
     }
+
+    @Override
+    public FileRotationPolicy copy() {
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java
index 84762a0..06fada8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java
@@ -45,6 +45,9 @@ public class TimedRotationPolicy implements FileRotationPolicy {
         this.interval = (long)(count * units.getMilliSeconds());
     }
 
+    protected TimedRotationPolicy(long interval) {
+        this.interval = interval;
+    }
 
     /**
      * Called for every tuple the HdfsBolt executes.
@@ -66,6 +69,11 @@ public class TimedRotationPolicy implements FileRotationPolicy {
 
     }
 
+    @Override
+    public FileRotationPolicy copy() {
+        return new TimedRotationPolicy(this.interval);
+    }
+
     public long getInterval(){
         return this.interval;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
new file mode 100644
index 0000000..4b36377
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.tuple.Tuple;
+
+import java.io.IOException;
+
+abstract public class AbstractHDFSWriter {
+    long lastUsedTime;
+    long offset;
+    boolean needsRotation;
+    Path filePath;
+    FileRotationPolicy rotationPolicy;
+
+    AbstractHDFSWriter(FileRotationPolicy policy, Path path) {
+        //This must be defensively copied, because a bolt probably has only one rotation policy object
+        this.rotationPolicy = policy.copy();
+        this.filePath = path;
+    }
+
+    final public long write(Tuple tuple) throws IOException{
+        doWrite(tuple);
+        this.needsRotation = rotationPolicy.mark(tuple, offset);
+
+        return this.offset;
+    }
+
+    final public void sync() throws IOException {
+        doSync();
+    }
+
+    final public void close() throws IOException {
+        doClose();
+    }
+
+    public boolean needsRotation() {
+        return needsRotation;
+    }
+
+    public Path getFilePath() {
+        return this.filePath;
+    }
+
+    abstract protected void doWrite(Tuple tuple) throws IOException;
+
+    abstract protected void doSync() throws IOException;
+
+    abstract protected void doClose() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
new file mode 100644
index 0000000..6e957c2
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+public class AvroGenericRecordHDFSWriter extends AbstractHDFSWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordHDFSWriter.class);
+
+    private FSDataOutputStream out;
+    private Schema schema;
+    private DataFileWriter<GenericRecord> avroWriter;
+
+    public AvroGenericRecordHDFSWriter(FileRotationPolicy policy, Path path, FSDataOutputStream stream, Schema schema) throws IOException {
+        super(policy, path);
+        this.out = stream;
+        this.schema = schema;
+        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        avroWriter = new DataFileWriter<>(datumWriter);
+        avroWriter.create(this.schema, this.out);
+    }
+
+    @Override
+    protected void doWrite(Tuple tuple) throws IOException {
+        GenericRecord avroRecord = (GenericRecord) tuple.getValue(0);
+        avroWriter.append(avroRecord);
+        offset = this.out.getPos();
+
+        this.needsRotation = this.rotationPolicy.mark(tuple, offset);
+    }
+
+    @Override
+    protected void doSync() throws IOException {
+        avroWriter.flush();
+
+        LOG.debug("Attempting to sync all data to filesystem");
+        if (this.out instanceof HdfsDataOutputStream) {
+            ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+        } else {
+            this.out.hsync();
+        }
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        this.avroWriter.close();
+        this.out.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
new file mode 100644
index 0000000..d69d770
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+public class HDFSWriter extends AbstractHDFSWriter{
+
+    private static final Logger LOG = LoggerFactory.getLogger(HDFSWriter.class);
+
+    private FSDataOutputStream out;
+    private RecordFormat format;
+
+    public HDFSWriter(FileRotationPolicy policy, Path path, FSDataOutputStream out, RecordFormat format) {
+        super(policy, path);
+        this.out = out;
+        this.format = format;
+    }
+
+    @Override
+    protected void doWrite(Tuple tuple) throws IOException {
+        byte[] bytes = this.format.format(tuple);
+        out.write(bytes);
+        this.offset += bytes.length;
+    }
+
+    @Override
+    protected void doSync() throws IOException {
+        LOG.info("Attempting to sync all data to filesystem");
+        if (this.out instanceof HdfsDataOutputStream) {
+            ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+        } else {
+            this.out.hsync();
+        }
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        this.out.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
new file mode 100644
index 0000000..fd50496
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * The NullPartitioner partitions every tuple to the empty string. In otherwords, no partition sub directories will
+ * be added to the path.
+ */
+public class NullPartitioner implements Partitioner {
+    @Override
+    public String getPartitionPath(final Tuple tuple) {
+        return "";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
new file mode 100644
index 0000000..6cf0fbd
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface Partitioner extends Serializable{
+
+    /**
+     * Return a relative path that the tuple should be written to. For example, if an HdfsBolt were configured to write
+     * to /common/output and a partitioner returned "/foo" then the bolt should open a file in "/common/output/foo"
+     *
+     * A best practice is to use Path.SEPARATOR instead of a literal "/"
+     *
+     * @param tuple The tuple for which the relative path is being calculated.
+     * @return
+     */
+    public String getPartitionPath(final Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
new file mode 100644
index 0000000..ec78fd6
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.storm.hdfs.bolt.format.SequenceFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SequenceFileWriter extends AbstractHDFSWriter{
+
+    private static final Logger LOG = LoggerFactory.getLogger(SequenceFileWriter.class);
+
+    private SequenceFile.Writer writer;
+    private SequenceFormat format;
+
+    public SequenceFileWriter(FileRotationPolicy policy, Path path, SequenceFile.Writer writer, SequenceFormat format) {
+        super(policy, path);
+        this.writer = writer;
+        this.format = format;
+    }
+
+    @Override
+    protected void doWrite(Tuple tuple) throws IOException {
+        this.writer.append(this.format.key(tuple), this.format.value(tuple));
+        this.offset = this.writer.getLength();
+    }
+
+    @Override
+    protected void doSync() throws IOException {
+        LOG.debug("Attempting to sync all data to filesystem");
+        this.writer.hsync();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        this.writer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
index 8ff05bc..cd828da 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
@@ -27,7 +27,7 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
@@ -40,6 +40,7 @@ import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.junit.Before;
 import org.junit.After;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.Assert;
 
@@ -65,28 +66,38 @@ public class AvroGenericRecordBoltTest {
     private DistributedFileSystem fs;
     private MiniDFSCluster hdfsCluster;
     private static final String testRoot = "/unittest";
-    private static final Schema schema;
+    private static final Schema schema1;
+    private static final Schema schema2;
     private static final Tuple tuple1;
     private static final Tuple tuple2;
-    private static final String userSchema = "{\"type\":\"record\"," +
+    private static final String schemaV1 = "{\"type\":\"record\"," +
             "\"name\":\"myrecord\"," +
             "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
             "{ \"name\":\"int1\", \"type\":\"int\" }]}";
 
+    private static final String schemaV2 = "{\"type\":\"record\"," +
+            "\"name\":\"myrecord\"," +
+            "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
+            "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," +
+            "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+
     static {
 
         Schema.Parser parser = new Schema.Parser();
-        schema = parser.parse(userSchema);
+        schema1 = parser.parse(schemaV1);
+
+        parser = new Schema.Parser();
+        schema2 = parser.parse(schemaV2);
 
-        GenericRecord record1 = new GenericData.Record(schema);
-        record1.put("foo1", "bar1");
-        record1.put("int1", 1);
-        tuple1 = generateTestTuple(record1);
+        GenericRecordBuilder builder1 = new GenericRecordBuilder(schema1);
+        builder1.set("foo1", "bar1");
+        builder1.set("int1", 1);
+        tuple1 = generateTestTuple(builder1.build());
 
-        GenericRecord record2 = new GenericData.Record(schema);
-        record2.put("foo1", "bar2");
-        record2.put("int1", 2);
-        tuple2 = generateTestTuple(record2);
+        GenericRecordBuilder builder2 = new GenericRecordBuilder(schema2);
+        builder2.set("foo1", "bar2");
+        builder2.set("int1", 2);
+        tuple2 = generateTestTuple(builder2.build());
     }
 
     @Mock private OutputCollector collector;
@@ -116,30 +127,76 @@ public class AvroGenericRecordBoltTest {
 
     @Test public void multipleTuplesOneFile() throws IOException
     {
-        AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, userSchema);
+        AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, schemaV1);
 
         bolt.prepare(new Config(), topologyContext, collector);
         bolt.execute(tuple1);
-        bolt.execute(tuple2);
         bolt.execute(tuple1);
-        bolt.execute(tuple2);
+        bolt.execute(tuple1);
+        bolt.execute(tuple1);
 
         Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
-        verifyAllAvroFiles(testRoot, schema);
+        verifyAllAvroFiles(testRoot);
     }
 
     @Test public void multipleTuplesMutliplesFiles() throws IOException
     {
-        AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, userSchema);
+        AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, schemaV1);
+
+        bolt.prepare(new Config(), topologyContext, collector);
+        bolt.execute(tuple1);
+        bolt.execute(tuple1);
+        bolt.execute(tuple1);
+        bolt.execute(tuple1);
+
+        Assert.assertEquals(4, countNonZeroLengthFiles(testRoot));
+        verifyAllAvroFiles(testRoot);
+    }
+
+    @Test public void forwardSchemaChangeWorks() throws IOException
+    {
+        AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV1);
+
+        bolt.prepare(new Config(), topologyContext, collector);
+        bolt.execute(tuple1);
+        bolt.execute(tuple2);
+
+        //Schema change should have forced a rotation
+        Assert.assertEquals(2, countNonZeroLengthFiles(testRoot));
+
+        verifyAllAvroFiles(testRoot);
+    }
+
+    @Test public void backwardSchemaChangeWorks() throws IOException
+    {
+        AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2);
+
+        bolt.prepare(new Config(), topologyContext, collector);
+        bolt.execute(tuple1);
+        bolt.execute(tuple2);
+
+        //Schema changes should have forced file rotations
+        Assert.assertEquals(2, countNonZeroLengthFiles(testRoot));
+        verifyAllAvroFiles(testRoot);
+    }
+
+    @Test public void schemaThrashing() throws IOException
+    {
+        AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2);
 
         bolt.prepare(new Config(), topologyContext, collector);
         bolt.execute(tuple1);
         bolt.execute(tuple2);
         bolt.execute(tuple1);
         bolt.execute(tuple2);
+        bolt.execute(tuple1);
+        bolt.execute(tuple2);
+        bolt.execute(tuple1);
+        bolt.execute(tuple2);
 
-        Assert.assertEquals(4, countNonZeroLengthFiles(testRoot));
-        verifyAllAvroFiles(testRoot, schema);
+        //Two distinct schema should result in only two files
+        Assert.assertEquals(2, countNonZeroLengthFiles(testRoot));
+        verifyAllAvroFiles(testRoot);
     }
 
     private AvroGenericRecordBolt makeAvroBolt(String nameNodeAddr, int countSync, float rotationSizeMB, String schemaAsString) {
@@ -154,7 +211,6 @@ public class AvroGenericRecordBoltTest {
         return new AvroGenericRecordBolt()
                 .withFsUrl(nameNodeAddr)
                 .withFileNameFormat(fieldsFileNameFormat)
-                .withSchemaAsString(schemaAsString)
                 .withRotationPolicy(rotationPolicy)
                 .withSyncPolicy(fieldsSyncPolicy);
     }
@@ -171,12 +227,12 @@ public class AvroGenericRecordBoltTest {
         return new TupleImpl(topologyContext, new Values(record), 1, "");
     }
 
-    private void verifyAllAvroFiles(String path, Schema schema) throws IOException {
+    private void verifyAllAvroFiles(String path) throws IOException {
         Path p = new Path(path);
 
         for (FileStatus file : fs.listStatus(p)) {
             if (file.getLen() > 0) {
-                fileIsGoodAvro(file.getPath(), schema);
+                fileIsGoodAvro(file.getPath());
             }
         }
     }
@@ -194,8 +250,8 @@ public class AvroGenericRecordBoltTest {
         return nonZero;
     }
 
-    private void fileIsGoodAvro (Path path, Schema schema) throws IOException {
-        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
+    private void fileIsGoodAvro (Path path) throws IOException {
+        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         FSDataInputStream in = fs.open(path, 0);
         FileOutputStream out = new FileOutputStream("target/FOO.avro");
 
@@ -212,7 +268,6 @@ public class AvroGenericRecordBoltTest {
         GenericRecord user = null;
         while (dataFileReader.hasNext()) {
             user = dataFileReader.next(user);
-            System.out.println(user);
         }
 
         file.delete();

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index ecbad8a..e8f0702 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -35,6 +35,7 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.Partitioner;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.Rule;
@@ -109,6 +110,30 @@ public class TestHdfsBolt {
     }
 
     @Test
+    public void testPartitionedOutput() throws IOException {
+        HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, 1000f);
+
+        Partitioner partitoner = new Partitioner() {
+            @Override
+            public String getPartitionPath(Tuple tuple) {
+                return Path.SEPARATOR + tuple.getStringByField("city");
+            }
+        };
+
+        bolt.prepare(new Config(), topologyContext, collector);
+        bolt.withPartitioner(partitoner);
+
+        bolt.execute(tuple1);
+        bolt.execute(tuple2);
+
+        verify(collector).ack(tuple1);
+        verify(collector).ack(tuple2);
+
+        Assert.assertEquals(1, countNonZeroLengthFiles(testRoot + "/SFO"));
+        Assert.assertEquals(1, countNonZeroLengthFiles(testRoot + "/SJO"));
+    }
+
+    @Test
     public void testTwoTuplesOneFile() throws IOException
     {
         HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f);
@@ -127,8 +152,9 @@ public class TestHdfsBolt {
     @Test
     public void testFailedSync() throws IOException
     {
-        HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, .00001f);
+        HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f);
         bolt.prepare(new Config(), topologyContext, collector);
+        bolt.execute(tuple1);
 
         fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
 
@@ -138,8 +164,8 @@ public class TestHdfsBolt {
 
     }
 
-    // One tuple and one rotation should yield one file with data and one zero length file
-    // The failed executions should not cause rotations and new zero length files
+    // One tuple and one rotation should yield one file with data
+    // The failed executions should not cause rotations and any new files
     @Test
     public void testFailureFilecount() throws IOException, InterruptedException
     {
@@ -168,7 +194,7 @@ public class TestHdfsBolt {
         }
 
         Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
-        Assert.assertEquals(1, countZeroLengthFiles(testRoot));
+        Assert.assertEquals(0, countZeroLengthFiles(testRoot));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
index 870d4ca..9913d9d 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
@@ -127,14 +127,14 @@ public class TestSequenceFileBolt {
     @Test
     public void testFailedSync() throws IOException
     {
-        SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 1, .00001f);
+        SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f);
         bolt.prepare(new Config(), topologyContext, collector);
+        bolt.execute(tuple1);
 
         fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
         // All writes/syncs will fail so this should cause a RuntimeException
         thrown.expect(RuntimeException.class);
         bolt.execute(tuple1);
-
     }
 
     private SequenceFileBolt makeSeqBolt(String nameNodeAddr, int countSync, float rotationSizeMB) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
new file mode 100644
index 0000000..fd99efe
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class TestWritersMap {
+
+    AbstractHdfsBolt.WritersMap map = new AbstractHdfsBolt.WritersMap(2);
+    @Mock AbstractHDFSWriter foo;
+    @Mock AbstractHDFSWriter bar;
+    @Mock AbstractHDFSWriter baz;
+
+    @Test public void testLRUBehavior()
+    {
+        map.put("FOO", foo);
+        map.put("BAR", bar);
+
+        //Access foo to make it most recently used
+        map.get("FOO");
+
+        //Add an element and bar should drop out
+        map.put("BAZ", baz);
+
+        Assert.assertTrue(map.keySet().contains("FOO"));
+        Assert.assertTrue(map.keySet().contains("BAZ"));
+
+        Assert.assertFalse(map.keySet().contains("BAR"));
+    }
+}


[3/3] storm git commit: Add 1464 to CHANGELOG

Posted by do...@apache.org.
Add 1464 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f48d7941
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f48d7941
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f48d7941

Branch: refs/heads/master
Commit: f48d7941b10483e87a30b4849321c4dc0844a5a5
Parents: 1b6724c
Author: Aaron Dossett <aa...@target.com>
Authored: Thu Apr 7 11:49:15 2016 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Thu Apr 7 11:49:15 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f48d7941/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 890ea48..d8bae07 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1464: storm-hdfs support for multiple file outputs
  * STORM-515: Clojure documentation and examples
  * STORM-1279: port backtype.storm.daemon.supervisor to java
  * STORM-1668: Flux silently fails while setting a non-existent property.


[2/3] storm git commit: Merge branch 'STORM-1494' of https://github.com/dossett/storm into STORM-1464

Posted by do...@apache.org.
Merge branch 'STORM-1494' of https://github.com/dossett/storm into STORM-1464


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b6724c2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b6724c2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b6724c2

Branch: refs/heads/master
Commit: 1b6724c2ff4bc61b49c5d0d8956984bf4be68256
Parents: 4284b09 7554fe2
Author: Aaron Dossett <aa...@target.com>
Authored: Thu Apr 7 11:22:11 2016 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Thu Apr 7 11:22:11 2016 -0500

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   |  29 ++-
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       | 182 +++++++++++++------
 .../storm/hdfs/bolt/AvroGenericRecordBolt.java  |  75 +++-----
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    |  39 ++--
 .../storm/hdfs/bolt/SequenceFileBolt.java       |  35 ++--
 .../hdfs/bolt/rotation/FileRotationPolicy.java  |   5 +
 .../bolt/rotation/FileSizeRotationPolicy.java   |   8 +
 .../hdfs/bolt/rotation/NoRotationPolicy.java    |   5 +
 .../hdfs/bolt/rotation/TimedRotationPolicy.java |   8 +
 .../storm/hdfs/common/AbstractHDFSWriter.java   |  68 +++++++
 .../common/AvroGenericRecordHDFSWriter.java     |  80 ++++++++
 .../apache/storm/hdfs/common/HDFSWriter.java    |  66 +++++++
 .../storm/hdfs/common/NullPartitioner.java      |  31 ++++
 .../apache/storm/hdfs/common/Partitioner.java   |  36 ++++
 .../storm/hdfs/common/SequenceFileWriter.java   |  59 ++++++
 .../hdfs/bolt/AvroGenericRecordBoltTest.java    | 105 ++++++++---
 .../apache/storm/hdfs/bolt/TestHdfsBolt.java    |  34 +++-
 .../storm/hdfs/bolt/TestSequenceFileBolt.java   |   4 +-
 .../apache/storm/hdfs/bolt/TestWritersMap.java  |  48 +++++
 19 files changed, 730 insertions(+), 187 deletions(-)
----------------------------------------------------------------------