You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by do...@apache.org on 2016/04/07 18:53:52 UTC
[1/3] storm git commit: STORM-1464: storm-hdfs support for multiple
output files and partitioning
Repository: storm
Updated Branches:
refs/heads/master 4284b09c7 -> f48d7941b
STORM-1464: storm-hdfs support for multiple output files and partitioning
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7554fe20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7554fe20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7554fe20
Branch: refs/heads/master
Commit: 7554fe2042369141d374b149de40949557ee70ea
Parents: 500ef20
Author: Aaron Dossett <aa...@target.com>
Authored: Mon Mar 21 13:06:44 2016 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Mon Mar 21 13:06:44 2016 -0500
----------------------------------------------------------------------
external/storm-hdfs/README.md | 29 ++-
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 182 +++++++++++++------
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 75 +++-----
.../org/apache/storm/hdfs/bolt/HdfsBolt.java | 39 ++--
.../storm/hdfs/bolt/SequenceFileBolt.java | 35 ++--
.../hdfs/bolt/rotation/FileRotationPolicy.java | 5 +
.../bolt/rotation/FileSizeRotationPolicy.java | 8 +
.../hdfs/bolt/rotation/NoRotationPolicy.java | 5 +
.../hdfs/bolt/rotation/TimedRotationPolicy.java | 8 +
.../storm/hdfs/common/AbstractHDFSWriter.java | 68 +++++++
.../common/AvroGenericRecordHDFSWriter.java | 80 ++++++++
.../apache/storm/hdfs/common/HDFSWriter.java | 66 +++++++
.../storm/hdfs/common/NullPartitioner.java | 31 ++++
.../apache/storm/hdfs/common/Partitioner.java | 36 ++++
.../storm/hdfs/common/SequenceFileWriter.java | 59 ++++++
.../hdfs/bolt/AvroGenericRecordBoltTest.java | 105 ++++++++---
.../apache/storm/hdfs/bolt/TestHdfsBolt.java | 34 +++-
.../storm/hdfs/bolt/TestSequenceFileBolt.java | 4 +-
.../apache/storm/hdfs/bolt/TestWritersMap.java | 48 +++++
19 files changed, 730 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 2fc4c7b..3777481 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -184,6 +184,7 @@ Similar to sync policies, file rotation policies allow you to control when data
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
+ FileRotationPolicy copy();
}
```
@@ -240,6 +241,23 @@ If you are using Trident and sequence files you can do something like this:
.addRotationAction(new MoveFileAction().withDestination("/dest2/"));
```
+### Data Partitioning
+Data can be partitioned to different HDFS directories based on characteristics of the tuple being processed or purely
+external factors, such as system time. To partition your your data, write a class that implements the ```Partitioner```
+interface and pass it to the withPartitioner() method of your bolt. The getPartitionPath() method returns a partition
+path for a given tuple.
+
+Here's an example of a Partitioner that operates on a specific field of data:
+
+```java
+
+ Partitioner partitoner = new Partitioner() {
+ @Override
+ public String getPartitionPath(Tuple tuple) {
+ return Path.SEPARATOR + tuple.getStringByField("city");
+ }
+ };
+```
## HDFS Bolt Support for HDFS Sequence Files
@@ -303,16 +321,15 @@ The `org.apache.storm.hdfs.bolt.AvroGenericRecordBolt` class allows you to write
AvroGenericRecordBolt bolt = new AvroGenericRecordBolt()
.withFsUrl("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
- .withSchemaAsString(schema)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
```
-The setup is very similar to the `SequenceFileBolt` example above. The key difference is that instead of specifying a
-`SequenceFormat` you must provide a string representation of an Avro schema through the `withSchemaAsString()` method.
-An `org.apache.avro.Schema` object cannot be directly provided since it does not implement `Serializable`.
-The AvroGenericRecordBolt expects to receive tuples containing an Avro GenericRecord that conforms to the provided
-schema.
+The avro bolt will write records to separate files based on the schema of the record being processed. In other words,
+if the bolt receives records with two different schemas, it will write to two separate files. Each file will be rotatated
+in accordance with the specified rotation policy. If a large number of Avro schemas are expected, then the bolt should
+be configured with a maximum number of open files at least equal to the number of schemas expected to prevent excessive
+file open/close/create operations.
To use this bolt you **must** register the appropriate Kryo serializers with your topology configuration. A convenience
method is provided for this:
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index c56f486..5daa0fa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -17,14 +17,12 @@
*/
package org.apache.storm.hdfs.bolt;
-import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
-import org.apache.storm.utils.Utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,6 +30,9 @@ import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.hdfs.common.NullPartitioner;
+import org.apache.storm.hdfs.common.Partitioner;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
import org.slf4j.Logger;
@@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.List;
@@ -52,15 +55,16 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
* Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
*/
private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
+ private static final Integer DEFAULT_MAX_OPEN_FILES = 50;
- protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
- private Path currentFile;
+ protected Map<String, AbstractHDFSWriter> writers;
+ protected Map<String, Integer> rotationCounterMap = new HashMap<>();
+ protected List<RotationAction> rotationActions = new ArrayList<>();
protected OutputCollector collector;
protected transient FileSystem fs;
protected SyncPolicy syncPolicy;
protected FileRotationPolicy rotationPolicy;
protected FileNameFormat fileNameFormat;
- protected int rotation = 0;
protected String fsUrl;
protected String configKey;
protected transient Object writeLock;
@@ -69,22 +73,21 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
protected long offset = 0;
protected Integer fileRetryCount = DEFAULT_RETRY_COUNT;
protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
+ protected Integer maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
+ protected Partitioner partitioner = new NullPartitioner();
protected transient Configuration hdfsConfig;
- protected void rotateOutputFile() throws IOException {
+ protected void rotateOutputFile(AbstractHDFSWriter writer) throws IOException {
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
synchronized (this.writeLock) {
- closeOutputFile();
- this.rotation++;
+ writer.close();
- Path newFile = createOutputFile();
LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
for (RotationAction action : this.rotationActions) {
- action.execute(this.fs, this.currentFile);
+ action.execute(this.fs, writer.getFilePath());
}
- this.currentFile = newFile;
}
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms.", time);
@@ -104,6 +107,8 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
throw new IllegalStateException("File system URL must be specified.");
}
+ writers = new WritersMap(this.maxOpenFiles);
+
this.collector = collector;
this.fileNameFormat.prepare(conf, topologyContext);
this.hdfsConfig = new Configuration();
@@ -117,26 +122,12 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
try{
HdfsSecurityUtil.login(conf, hdfsConfig);
doPrepare(conf, topologyContext, collector);
- this.currentFile = createOutputFile();
-
} catch (Exception e){
throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e);
}
if(this.rotationPolicy instanceof TimedRotationPolicy){
- long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
- this.rotationTimer = new Timer(true);
- TimerTask task = new TimerTask() {
- @Override
- public void run() {
- try {
- rotateOutputFile();
- } catch(IOException e){
- LOG.warn("IOException during scheduled file rotation.", e);
- }
- }
- };
- this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+ startTimedRotationPolicy();
}
}
@@ -145,13 +136,20 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
synchronized (this.writeLock) {
boolean forceSync = false;
+ AbstractHDFSWriter writer = null;
+ String writerKey = null;
+
if (TupleUtils.isTick(tuple)) {
LOG.debug("TICK! forcing a file system flush");
this.collector.ack(tuple);
forceSync = true;
} else {
+
+ writerKey = getHashKeyForTuple(tuple);
+
try {
- writeTuple(tuple);
+ writer = getOrCreateWriter(writerKey, tuple);
+ this.offset = writer.write(tuple);
tupleBatch.add(tuple);
} catch (IOException e) {
//If the write failed, try to sync anything already written
@@ -171,7 +169,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
while (success == false && attempts < fileRetryCount) {
attempts += 1;
try {
- syncTuples();
+ syncAllWriters();
LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
for (Tuple t : tupleBatch) {
this.collector.ack(t);
@@ -198,22 +196,53 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
}
}
- if(this.rotationPolicy.mark(tuple, this.offset)) {
- try {
- rotateOutputFile();
- this.rotationPolicy.reset();
- this.offset = 0;
- } catch (IOException e) {
- this.collector.reportError(e);
- LOG.warn("File could not be rotated");
- //At this point there is nothing to do. In all likelihood any filesystem operations will fail.
- //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That
- //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
- }
+ if (writer != null && writer.needsRotation()) {
+ doRotationAndRemoveWriter(writerKey, writer);
}
}
}
+ private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException {
+ AbstractHDFSWriter writer;
+
+ writer = writers.get(writerKey);
+ if (writer == null) {
+ Path pathForNextFile = getBasePathForNextFile(tuple);
+ writer = makeNewWriter(pathForNextFile, tuple);
+ writers.put(writerKey, writer);
+ }
+ return writer;
+ }
+
+ /**
+ * A tuple must be mapped to a writer based on two factors:
+ * - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt
+ * for an example of this)
+ * - the directory the tuple will be partioned into
+ *
+ * @param tuple
+ * @return
+ */
+ private String getHashKeyForTuple(Tuple tuple) {
+ final String boltKey = getWriterKey(tuple);
+ final String partitionDir = this.partitioner.getPartitionPath(tuple);
+ return boltKey + "****" + partitionDir;
+ }
+
+ void doRotationAndRemoveWriter(String writerKey, AbstractHDFSWriter writer) {
+ try {
+ rotateOutputFile(writer);
+ } catch (IOException e) {
+ this.collector.reportError(e);
+ LOG.error("File could not be rotated");
+ //At this point there is nothing to do. In all likelihood any filesystem operations will fail.
+ //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That
+ //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
+ } finally {
+ writers.remove(writerKey);
+ }
+ }
+
@Override
public Map<String, Object> getComponentConfiguration() {
return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), tickTupleInterval);
@@ -223,29 +252,64 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
- /**
- * writes a tuple to the underlying filesystem but makes no guarantees about syncing data.
- *
- * this.offset is also updated to reflect additional data written
- *
- * @param tuple
- * @throws IOException
- */
- abstract protected void writeTuple(Tuple tuple) throws IOException;
+ private void syncAllWriters() throws IOException {
+ for (AbstractHDFSWriter writer : writers.values()) {
+ writer.sync();
+ }
+ }
- /**
- * Make the best effort to sync written data to the underlying file system. Concrete classes should very clearly
- * state the file state that sync guarantees. For example, HdfsBolt can make a much stronger guarantee than
- * SequenceFileBolt.
- *
- * @throws IOException
- */
- abstract protected void syncTuples() throws IOException;
+ private void startTimedRotationPolicy() {
+ long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
+ this.rotationTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ for (final AbstractHDFSWriter writer : writers.values()) {
+ try {
+ rotateOutputFile(writer);
+ } catch (IOException e) {
+ LOG.warn("IOException during scheduled file rotation.", e);
+ }
+ }
+ writers.clear();
+ }
+ };
+ this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+ }
- abstract protected void closeOutputFile() throws IOException;
+ protected Path getBasePathForNextFile(Tuple tuple) {
- abstract protected Path createOutputFile() throws IOException;
+ final String partitionPath = this.partitioner.getPartitionPath(tuple);
+ final int rotation;
+ if (rotationCounterMap.containsKey(partitionPath))
+ {
+ rotation = rotationCounterMap.get(partitionPath) + 1;
+ } else {
+ rotation = 0;
+ }
+ rotationCounterMap.put(partitionPath, rotation);
+
+ return new Path(this.fsUrl + this.fileNameFormat.getPath() + partitionPath,
+ this.fileNameFormat.getName(rotation, System.currentTimeMillis()));
+ }
abstract protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException;
+ abstract protected String getWriterKey(Tuple tuple);
+
+ abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException;
+
+ static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> {
+ final long maxWriters;
+
+ public WritersMap(long maxWriters) {
+ super((int)maxWriters, 0.75f, true);
+ this.maxWriters = maxWriters;
+ }
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String, AbstractHDFSWriter> eldest) {
+ return this.size() > this.maxWriters;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
index cdeb2f8..e173d2a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
@@ -17,18 +17,16 @@
*/
package org.apache.storm.hdfs.bolt;
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.hdfs.common.AvroGenericRecordHDFSWriter;
+import org.apache.storm.hdfs.common.Partitioner;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
@@ -38,24 +36,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
-import java.util.EnumSet;
import java.util.Map;
public class AvroGenericRecordBolt extends AbstractHdfsBolt{
private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordBolt.class);
- private transient FSDataOutputStream out;
- private Schema schema;
- private String schemaAsString;
- private DataFileWriter<GenericRecord> avroWriter;
-
- public AvroGenericRecordBolt withSchemaAsString(String schemaAsString)
- {
- this.schemaAsString = schemaAsString;
- return this;
- }
-
public AvroGenericRecordBolt withFsUrl(String fsUrl){
this.fsUrl = fsUrl;
return this;
@@ -91,51 +77,36 @@ public class AvroGenericRecordBolt extends AbstractHdfsBolt{
return this;
}
- @Override
- protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
- LOG.info("Preparing AvroGenericRecord Bolt...");
- this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
- Schema.Parser parser = new Schema.Parser();
- this.schema = parser.parse(this.schemaAsString);
+ public AvroGenericRecordBolt withMaxOpenFiles(int maxOpenFiles) {
+ this.maxOpenFiles = maxOpenFiles;
+ return this;
}
- @Override
- protected void writeTuple(Tuple tuple) throws IOException {
- GenericRecord avroRecord = (GenericRecord) tuple.getValue(0);
- avroWriter.append(avroRecord);
- offset = this.out.getPos();
+ public AvroGenericRecordBolt withPartitioner(Partitioner partitioner) {
+ this.partitioner = partitioner;
+ return this;
}
@Override
- protected void syncTuples() throws IOException {
- avroWriter.flush();
-
- LOG.debug("Attempting to sync all data to filesystem");
- if (this.out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
- } else {
- this.out.hsync();
- }
- this.syncPolicy.reset();
+ protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
+ LOG.info("Preparing AvroGenericRecord Bolt...");
+ this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
}
+ /**
+ * AvroGenericRecordBolt must override this method because messages with different schemas cannot be written to the
+ * same file. By treating the complete schema as the "key" AbstractHdfsBolt will associate a different writer for
+ * every distinct schema.
+ */
@Override
- protected void closeOutputFile() throws IOException
- {
- avroWriter.close();
- this.out.close();
+ protected String getWriterKey(Tuple tuple) {
+ Schema recordSchema = ((GenericRecord) tuple.getValue(0)).getSchema();
+ return recordSchema.toString();
}
@Override
- protected Path createOutputFile() throws IOException {
- Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
- this.out = this.fs.create(path);
-
- //Initialize writer
- DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
- avroWriter = new DataFileWriter<>(datumWriter);
- avroWriter.create(this.schema, this.out);
-
- return path;
+ protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException {
+ Schema recordSchema = ((GenericRecord) tuple.getValue(0)).getSchema();
+ return new AvroGenericRecordHDFSWriter(this.rotationPolicy, path, this.fs.create(path), recordSchema);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index 0299f43..614de6b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -23,12 +23,13 @@ import org.apache.storm.tuple.Tuple;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.hdfs.common.HDFSWriter;
+import org.apache.storm.hdfs.common.Partitioner;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,38 +90,30 @@ public class HdfsBolt extends AbstractHdfsBolt{
return this;
}
- @Override
- public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
- LOG.info("Preparing HDFS Bolt...");
- this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
+ public HdfsBolt withPartitioner(Partitioner partitioner) {
+ this.partitioner = partitioner;
+ return this;
}
- @Override
- protected void syncTuples() throws IOException {
- LOG.debug("Attempting to sync all data to filesystem");
- if (this.out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
- } else {
- this.out.hsync();
- }
+ public HdfsBolt withMaxOpenFiles(int maxOpenFiles) {
+ this.maxOpenFiles = maxOpenFiles;
+ return this;
}
@Override
- protected void writeTuple(Tuple tuple) throws IOException {
- byte[] bytes = this.format.format(tuple);
- out.write(bytes);
- this.offset += bytes.length;
+ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
+ LOG.info("Preparing HDFS Bolt...");
+ this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
}
@Override
- protected void closeOutputFile() throws IOException {
- this.out.close();
+ protected String getWriterKey(Tuple tuple) {
+ return "CONSTANT";
}
@Override
- protected Path createOutputFile() throws IOException {
- Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
+ protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException {
this.out = this.fs.create(path);
- return path;
+ return new HDFSWriter(rotationPolicy,path, out, format);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index e0db7c9..3c78075 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -28,6 +28,9 @@ import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.SequenceFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.hdfs.common.Partitioner;
+import org.apache.storm.hdfs.common.SequenceFileWriter;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,6 +107,16 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
return this;
}
+ public SequenceFileBolt withPartitioner(Partitioner partitioner) {
+ this.partitioner = partitioner;
+ return this;
+ }
+
+ public SequenceFileBolt withMaxOpenFiles(int maxOpenFiles) {
+ this.maxOpenFiles = maxOpenFiles;
+ return this;
+ }
+
@Override
public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
LOG.info("Preparing Sequence File Bolt...");
@@ -114,30 +127,20 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
}
@Override
- protected void syncTuples() throws IOException {
- LOG.debug("Attempting to sync all data to filesystem");
- this.writer.hsync();
+ protected String getWriterKey(Tuple tuple) {
+ return "CONSTANT";
}
@Override
- protected void writeTuple(Tuple tuple) throws IOException {
- this.writer.append(this.format.key(tuple), this.format.value(tuple));
- this.offset = this.writer.getLength();
- }
-
- protected Path createOutputFile() throws IOException {
- Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
- this.writer = SequenceFile.createWriter(
+ protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException {
+ SequenceFile.Writer writer = SequenceFile.createWriter(
this.hdfsConfig,
- SequenceFile.Writer.file(p),
+ SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(this.format.keyClass()),
SequenceFile.Writer.valueClass(this.format.valueClass()),
SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec))
);
- return p;
- }
- protected void closeOutputFile() throws IOException {
- this.writer.close();
+ return new SequenceFileWriter(this.rotationPolicy, path, writer, this.format);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
index 90ef772..aeb63fa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
@@ -48,4 +48,9 @@ public interface FileRotationPolicy extends Serializable {
*
*/
void reset();
+
+ /**
+ * Must be able to copy the rotation policy
+ */
+ FileRotationPolicy copy();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
index f0df921..5fb9bbc 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
@@ -64,6 +64,10 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
this.maxBytes = (long)(count * units.getByteCount());
}
+ protected FileSizeRotationPolicy(long maxBytes) {
+ this.maxBytes = maxBytes;
+ }
+
@Override
public boolean mark(Tuple tuple, long offset) {
long diff = offset - this.lastOffset;
@@ -78,4 +82,8 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
this.lastOffset = 0;
}
+ @Override
+ public FileRotationPolicy copy() {
+ return new FileSizeRotationPolicy(this.maxBytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java
index 14fa496..a00037b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java
@@ -32,4 +32,9 @@ public class NoRotationPolicy implements FileRotationPolicy {
@Override
public void reset() {
}
+
+ @Override
+ public FileRotationPolicy copy() {
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java
index 84762a0..06fada8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java
@@ -45,6 +45,9 @@ public class TimedRotationPolicy implements FileRotationPolicy {
this.interval = (long)(count * units.getMilliSeconds());
}
+ protected TimedRotationPolicy(long interval) {
+ this.interval = interval;
+ }
/**
* Called for every tuple the HdfsBolt executes.
@@ -66,6 +69,11 @@ public class TimedRotationPolicy implements FileRotationPolicy {
}
+ @Override
+ public FileRotationPolicy copy() {
+ return new TimedRotationPolicy(this.interval);
+ }
+
public long getInterval(){
return this.interval;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
new file mode 100644
index 0000000..4b36377
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.tuple.Tuple;
+
+import java.io.IOException;
+
+abstract public class AbstractHDFSWriter {
+ long lastUsedTime;
+ long offset;
+ boolean needsRotation;
+ Path filePath;
+ FileRotationPolicy rotationPolicy;
+
+ AbstractHDFSWriter(FileRotationPolicy policy, Path path) {
+ //This must be defensively copied, because a bolt probably has only one rotation policy object
+ this.rotationPolicy = policy.copy();
+ this.filePath = path;
+ }
+
+ final public long write(Tuple tuple) throws IOException{
+ doWrite(tuple);
+ this.needsRotation = rotationPolicy.mark(tuple, offset);
+
+ return this.offset;
+ }
+
+ final public void sync() throws IOException {
+ doSync();
+ }
+
+ final public void close() throws IOException {
+ doClose();
+ }
+
+ public boolean needsRotation() {
+ return needsRotation;
+ }
+
+ public Path getFilePath() {
+ return this.filePath;
+ }
+
+ abstract protected void doWrite(Tuple tuple) throws IOException;
+
+ abstract protected void doSync() throws IOException;
+
+ abstract protected void doClose() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
new file mode 100644
index 0000000..6e957c2
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+public class AvroGenericRecordHDFSWriter extends AbstractHDFSWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordHDFSWriter.class);
+
+ private FSDataOutputStream out;
+ private Schema schema;
+ private DataFileWriter<GenericRecord> avroWriter;
+
+ public AvroGenericRecordHDFSWriter(FileRotationPolicy policy, Path path, FSDataOutputStream stream, Schema schema) throws IOException {
+ super(policy, path);
+ this.out = stream;
+ this.schema = schema;
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ avroWriter = new DataFileWriter<>(datumWriter);
+ avroWriter.create(this.schema, this.out);
+ }
+
+ @Override
+ protected void doWrite(Tuple tuple) throws IOException {
+ GenericRecord avroRecord = (GenericRecord) tuple.getValue(0);
+ avroWriter.append(avroRecord);
+ offset = this.out.getPos();
+
+ this.needsRotation = this.rotationPolicy.mark(tuple, offset);
+ }
+
+ @Override
+ protected void doSync() throws IOException {
+ avroWriter.flush();
+
+ LOG.debug("Attempting to sync all data to filesystem");
+ if (this.out instanceof HdfsDataOutputStream) {
+ ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ } else {
+ this.out.hsync();
+ }
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ this.avroWriter.close();
+ this.out.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
new file mode 100644
index 0000000..d69d770
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+public class HDFSWriter extends AbstractHDFSWriter{
+
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSWriter.class);
+
+ private FSDataOutputStream out;
+ private RecordFormat format;
+
+ public HDFSWriter(FileRotationPolicy policy, Path path, FSDataOutputStream out, RecordFormat format) {
+ super(policy, path);
+ this.out = out;
+ this.format = format;
+ }
+
+ @Override
+ protected void doWrite(Tuple tuple) throws IOException {
+ byte[] bytes = this.format.format(tuple);
+ out.write(bytes);
+ this.offset += bytes.length;
+ }
+
+ @Override
+ protected void doSync() throws IOException {
+ LOG.info("Attempting to sync all data to filesystem");
+ if (this.out instanceof HdfsDataOutputStream) {
+ ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ } else {
+ this.out.hsync();
+ }
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ this.out.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
new file mode 100644
index 0000000..fd50496
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * The NullPartitioner partitions every tuple to the empty string. In otherwords, no partition sub directories will
+ * be added to the path.
+ */
+public class NullPartitioner implements Partitioner {
+ @Override
+ public String getPartitionPath(final Tuple tuple) {
+ return "";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
new file mode 100644
index 0000000..6cf0fbd
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface Partitioner extends Serializable{
+
+ /**
+ * Return a relative path that the tuple should be written to. For example, if an HdfsBolt were configured to write
+ * to /common/output and a partitioner returned "/foo" then the bolt should open a file in "/common/output/foo"
+ *
+ * A best practice is to use Path.SEPARATOR instead of a literal "/"
+ *
+ * @param tuple The tuple for which the relative path is being calculated.
+ * @return
+ */
+ public String getPartitionPath(final Tuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
new file mode 100644
index 0000000..ec78fd6
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.storm.hdfs.bolt.format.SequenceFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SequenceFileWriter extends AbstractHDFSWriter{
+
+ private static final Logger LOG = LoggerFactory.getLogger(SequenceFileWriter.class);
+
+ private SequenceFile.Writer writer;
+ private SequenceFormat format;
+
+ public SequenceFileWriter(FileRotationPolicy policy, Path path, SequenceFile.Writer writer, SequenceFormat format) {
+ super(policy, path);
+ this.writer = writer;
+ this.format = format;
+ }
+
+ @Override
+ protected void doWrite(Tuple tuple) throws IOException {
+ this.writer.append(this.format.key(tuple), this.format.value(tuple));
+ this.offset = this.writer.getLength();
+ }
+
+ @Override
+ protected void doSync() throws IOException {
+ LOG.debug("Attempting to sync all data to filesystem");
+ this.writer.hsync();
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ this.writer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
index 8ff05bc..cd828da 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
@@ -27,7 +27,7 @@ import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
@@ -40,6 +40,7 @@ import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.junit.Before;
import org.junit.After;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.Assert;
@@ -65,28 +66,38 @@ public class AvroGenericRecordBoltTest {
private DistributedFileSystem fs;
private MiniDFSCluster hdfsCluster;
private static final String testRoot = "/unittest";
- private static final Schema schema;
+ private static final Schema schema1;
+ private static final Schema schema2;
private static final Tuple tuple1;
private static final Tuple tuple2;
- private static final String userSchema = "{\"type\":\"record\"," +
+ private static final String schemaV1 = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
"{ \"name\":\"int1\", \"type\":\"int\" }]}";
+ private static final String schemaV2 = "{\"type\":\"record\"," +
+ "\"name\":\"myrecord\"," +
+ "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
+ "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," +
+ "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+
static {
Schema.Parser parser = new Schema.Parser();
- schema = parser.parse(userSchema);
+ schema1 = parser.parse(schemaV1);
+
+ parser = new Schema.Parser();
+ schema2 = parser.parse(schemaV2);
- GenericRecord record1 = new GenericData.Record(schema);
- record1.put("foo1", "bar1");
- record1.put("int1", 1);
- tuple1 = generateTestTuple(record1);
+ GenericRecordBuilder builder1 = new GenericRecordBuilder(schema1);
+ builder1.set("foo1", "bar1");
+ builder1.set("int1", 1);
+ tuple1 = generateTestTuple(builder1.build());
- GenericRecord record2 = new GenericData.Record(schema);
- record2.put("foo1", "bar2");
- record2.put("int1", 2);
- tuple2 = generateTestTuple(record2);
+ GenericRecordBuilder builder2 = new GenericRecordBuilder(schema2);
+ builder2.set("foo1", "bar2");
+ builder2.set("int1", 2);
+ tuple2 = generateTestTuple(builder2.build());
}
@Mock private OutputCollector collector;
@@ -116,30 +127,76 @@ public class AvroGenericRecordBoltTest {
@Test public void multipleTuplesOneFile() throws IOException
{
- AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, userSchema);
+ AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, schemaV1);
bolt.prepare(new Config(), topologyContext, collector);
bolt.execute(tuple1);
- bolt.execute(tuple2);
bolt.execute(tuple1);
- bolt.execute(tuple2);
+ bolt.execute(tuple1);
+ bolt.execute(tuple1);
Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
- verifyAllAvroFiles(testRoot, schema);
+ verifyAllAvroFiles(testRoot);
}
@Test public void multipleTuplesMutliplesFiles() throws IOException
{
- AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, userSchema);
+ AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, schemaV1);
+
+ bolt.prepare(new Config(), topologyContext, collector);
+ bolt.execute(tuple1);
+ bolt.execute(tuple1);
+ bolt.execute(tuple1);
+ bolt.execute(tuple1);
+
+ Assert.assertEquals(4, countNonZeroLengthFiles(testRoot));
+ verifyAllAvroFiles(testRoot);
+ }
+
+ @Test public void forwardSchemaChangeWorks() throws IOException
+ {
+ AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV1);
+
+ bolt.prepare(new Config(), topologyContext, collector);
+ bolt.execute(tuple1);
+ bolt.execute(tuple2);
+
+ //Schema change should have forced a rotation
+ Assert.assertEquals(2, countNonZeroLengthFiles(testRoot));
+
+ verifyAllAvroFiles(testRoot);
+ }
+
+ @Test public void backwardSchemaChangeWorks() throws IOException
+ {
+ AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2);
+
+ bolt.prepare(new Config(), topologyContext, collector);
+ bolt.execute(tuple1);
+ bolt.execute(tuple2);
+
+ //Schema changes should have forced file rotations
+ Assert.assertEquals(2, countNonZeroLengthFiles(testRoot));
+ verifyAllAvroFiles(testRoot);
+ }
+
+ @Test public void schemaThrashing() throws IOException
+ {
+ AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2);
bolt.prepare(new Config(), topologyContext, collector);
bolt.execute(tuple1);
bolt.execute(tuple2);
bolt.execute(tuple1);
bolt.execute(tuple2);
+ bolt.execute(tuple1);
+ bolt.execute(tuple2);
+ bolt.execute(tuple1);
+ bolt.execute(tuple2);
- Assert.assertEquals(4, countNonZeroLengthFiles(testRoot));
- verifyAllAvroFiles(testRoot, schema);
+ //Two distinct schema should result in only two files
+ Assert.assertEquals(2, countNonZeroLengthFiles(testRoot));
+ verifyAllAvroFiles(testRoot);
}
private AvroGenericRecordBolt makeAvroBolt(String nameNodeAddr, int countSync, float rotationSizeMB, String schemaAsString) {
@@ -154,7 +211,6 @@ public class AvroGenericRecordBoltTest {
return new AvroGenericRecordBolt()
.withFsUrl(nameNodeAddr)
.withFileNameFormat(fieldsFileNameFormat)
- .withSchemaAsString(schemaAsString)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(fieldsSyncPolicy);
}
@@ -171,12 +227,12 @@ public class AvroGenericRecordBoltTest {
return new TupleImpl(topologyContext, new Values(record), 1, "");
}
- private void verifyAllAvroFiles(String path, Schema schema) throws IOException {
+ private void verifyAllAvroFiles(String path) throws IOException {
Path p = new Path(path);
for (FileStatus file : fs.listStatus(p)) {
if (file.getLen() > 0) {
- fileIsGoodAvro(file.getPath(), schema);
+ fileIsGoodAvro(file.getPath());
}
}
}
@@ -194,8 +250,8 @@ public class AvroGenericRecordBoltTest {
return nonZero;
}
- private void fileIsGoodAvro (Path path, Schema schema) throws IOException {
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
+ private void fileIsGoodAvro (Path path) throws IOException {
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
FSDataInputStream in = fs.open(path, 0);
FileOutputStream out = new FileOutputStream("target/FOO.avro");
@@ -212,7 +268,6 @@ public class AvroGenericRecordBoltTest {
GenericRecord user = null;
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
- System.out.println(user);
}
file.delete();
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index ecbad8a..e8f0702 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -35,6 +35,7 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.Partitioner;
import org.junit.Before;
import org.junit.After;
import org.junit.Rule;
@@ -109,6 +110,30 @@ public class TestHdfsBolt {
}
@Test
+ public void testPartitionedOutput() throws IOException {
+ HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, 1000f);
+
+ Partitioner partitoner = new Partitioner() {
+ @Override
+ public String getPartitionPath(Tuple tuple) {
+ return Path.SEPARATOR + tuple.getStringByField("city");
+ }
+ };
+
+ bolt.prepare(new Config(), topologyContext, collector);
+ bolt.withPartitioner(partitoner);
+
+ bolt.execute(tuple1);
+ bolt.execute(tuple2);
+
+ verify(collector).ack(tuple1);
+ verify(collector).ack(tuple2);
+
+ Assert.assertEquals(1, countNonZeroLengthFiles(testRoot + "/SFO"));
+ Assert.assertEquals(1, countNonZeroLengthFiles(testRoot + "/SJO"));
+ }
+
+ @Test
public void testTwoTuplesOneFile() throws IOException
{
HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f);
@@ -127,8 +152,9 @@ public class TestHdfsBolt {
@Test
public void testFailedSync() throws IOException
{
- HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, .00001f);
+ HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f);
bolt.prepare(new Config(), topologyContext, collector);
+ bolt.execute(tuple1);
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
@@ -138,8 +164,8 @@ public class TestHdfsBolt {
}
- // One tuple and one rotation should yield one file with data and one zero length file
- // The failed executions should not cause rotations and new zero length files
+ // One tuple and one rotation should yield one file with data
+ // The failed executions should not cause rotations and any new files
@Test
public void testFailureFilecount() throws IOException, InterruptedException
{
@@ -168,7 +194,7 @@ public class TestHdfsBolt {
}
Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
- Assert.assertEquals(1, countZeroLengthFiles(testRoot));
+ Assert.assertEquals(0, countZeroLengthFiles(testRoot));
}
@Test
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
index 870d4ca..9913d9d 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
@@ -127,14 +127,14 @@ public class TestSequenceFileBolt {
@Test
public void testFailedSync() throws IOException
{
- SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 1, .00001f);
+ SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f);
bolt.prepare(new Config(), topologyContext, collector);
+ bolt.execute(tuple1);
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
// All writes/syncs will fail so this should cause a RuntimeException
thrown.expect(RuntimeException.class);
bolt.execute(tuple1);
-
}
private SequenceFileBolt makeSeqBolt(String nameNodeAddr, int countSync, float rotationSizeMB) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
new file mode 100644
index 0000000..fd99efe
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class TestWritersMap {
+
+ AbstractHdfsBolt.WritersMap map = new AbstractHdfsBolt.WritersMap(2);
+ @Mock AbstractHDFSWriter foo;
+ @Mock AbstractHDFSWriter bar;
+ @Mock AbstractHDFSWriter baz;
+
+ @Test public void testLRUBehavior()
+ {
+ map.put("FOO", foo);
+ map.put("BAR", bar);
+
+ //Access foo to make it most recently used
+ map.get("FOO");
+
+ //Add an element and bar should drop out
+ map.put("BAZ", baz);
+
+ Assert.assertTrue(map.keySet().contains("FOO"));
+ Assert.assertTrue(map.keySet().contains("BAZ"));
+
+ Assert.assertFalse(map.keySet().contains("BAR"));
+ }
+}
[3/3] storm git commit: Add 1464 to CHANGELOG
Posted by do...@apache.org.
Add 1464 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f48d7941
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f48d7941
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f48d7941
Branch: refs/heads/master
Commit: f48d7941b10483e87a30b4849321c4dc0844a5a5
Parents: 1b6724c
Author: Aaron Dossett <aa...@target.com>
Authored: Thu Apr 7 11:49:15 2016 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Thu Apr 7 11:49:15 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f48d7941/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 890ea48..d8bae07 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1464: storm-hdfs support for multiple file outputs
* STORM-515: Clojure documentation and examples
* STORM-1279: port backtype.storm.daemon.supervisor to java
* STORM-1668: Flux silently fails while setting a non-existent property.
[2/3] storm git commit: Merge branch 'STORM-1494' of
https://github.com/dossett/storm into STORM-1464
Posted by do...@apache.org.
Merge branch 'STORM-1494' of https://github.com/dossett/storm into STORM-1464
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b6724c2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b6724c2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b6724c2
Branch: refs/heads/master
Commit: 1b6724c2ff4bc61b49c5d0d8956984bf4be68256
Parents: 4284b09 7554fe2
Author: Aaron Dossett <aa...@target.com>
Authored: Thu Apr 7 11:22:11 2016 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Thu Apr 7 11:22:11 2016 -0500
----------------------------------------------------------------------
external/storm-hdfs/README.md | 29 ++-
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 182 +++++++++++++------
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 75 +++-----
.../org/apache/storm/hdfs/bolt/HdfsBolt.java | 39 ++--
.../storm/hdfs/bolt/SequenceFileBolt.java | 35 ++--
.../hdfs/bolt/rotation/FileRotationPolicy.java | 5 +
.../bolt/rotation/FileSizeRotationPolicy.java | 8 +
.../hdfs/bolt/rotation/NoRotationPolicy.java | 5 +
.../hdfs/bolt/rotation/TimedRotationPolicy.java | 8 +
.../storm/hdfs/common/AbstractHDFSWriter.java | 68 +++++++
.../common/AvroGenericRecordHDFSWriter.java | 80 ++++++++
.../apache/storm/hdfs/common/HDFSWriter.java | 66 +++++++
.../storm/hdfs/common/NullPartitioner.java | 31 ++++
.../apache/storm/hdfs/common/Partitioner.java | 36 ++++
.../storm/hdfs/common/SequenceFileWriter.java | 59 ++++++
.../hdfs/bolt/AvroGenericRecordBoltTest.java | 105 ++++++++---
.../apache/storm/hdfs/bolt/TestHdfsBolt.java | 34 +++-
.../storm/hdfs/bolt/TestSequenceFileBolt.java | 4 +-
.../apache/storm/hdfs/bolt/TestWritersMap.java | 48 +++++
19 files changed, 730 insertions(+), 187 deletions(-)
----------------------------------------------------------------------