You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/08/14 20:01:17 UTC
[01/12] storm git commit: Support for exactly once semantics in
HdfsState
Repository: storm
Updated Branches:
refs/heads/master b8d5635e8 -> aa308e116
Support for exactly once semantics in HdfsState
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5631b9d4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5631b9d4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5631b9d4
Branch: refs/heads/master
Commit: 5631b9d4746f34127a1bc89cb4488d2b2d8ec9d7
Parents: 6a21b6a
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Jul 22 00:34:09 2015 +0530
Committer: Arun Iyer <ai...@Arun-Iyer-MBP.local>
Committed: Wed Jul 22 00:41:31 2015 +0530
----------------------------------------------------------------------
.../apache/storm/hdfs/trident/HdfsState.java | 336 +++++++++++++++----
.../trident/rotation/FileRotationPolicy.java | 8 +
.../rotation/FileSizeRotationPolicy.java | 5 +
.../hdfs/trident/rotation/NoRotationPolicy.java | 5 +
.../trident/rotation/TimedRotationPolicy.java | 5 +
5 files changed, 301 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 67fff88..9b6b48c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -17,14 +17,14 @@
*/
package org.apache.storm.hdfs.trident;
+import backtype.storm.Config;
import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
@@ -40,8 +40,7 @@ import storm.trident.operation.TridentCollector;
import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
import java.net.URI;
import java.util.*;
@@ -60,6 +59,10 @@ public class HdfsState implements State {
protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
protected transient Object writeLock;
protected transient Timer rotationTimer;
+ /**
+ * This is on by default unless TimedRotationPolicy is in use.
+ */
+ private boolean exactlyOnce = true;
abstract void closeOutputFile() throws IOException;
@@ -69,17 +72,28 @@ public class HdfsState implements State {
abstract void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException;
- protected void rotateOutputFile() throws IOException {
+ abstract long getCurrentOffset() throws IOException;
+
+ abstract void doCommit(Long txId) throws IOException;
+
+ abstract void doRecover(Path srcPath, long nBytes) throws Exception;
+
+ protected boolean isExactlyOnce() {
+ return this.exactlyOnce;
+ }
+
+ protected void rotateOutputFile(boolean doRotateAction) throws IOException {
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
synchronized (this.writeLock) {
closeOutputFile();
this.rotation++;
-
Path newFile = createOutputFile();
- LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
- for (RotationAction action : this.rotationActions) {
- action.execute(this.fs, this.currentFile);
+ if (doRotateAction) {
+ LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
+ for (RotationAction action : this.rotationActions) {
+ action.execute(this.fs, this.currentFile);
+ }
}
this.currentFile = newFile;
}
@@ -89,38 +103,49 @@ public class HdfsState implements State {
}
- void prepare(Map conf, int partitionIndex, int numPartitions){
+ protected void rotateOutputFile() throws IOException {
+ rotateOutputFile(true);
+ }
+
+
+ void prepare(Map conf, int partitionIndex, int numPartitions) {
this.writeLock = new Object();
- if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
+ if (this.rotationPolicy == null) {
+ throw new IllegalStateException("RotationPolicy must be specified.");
+ } else if (this.rotationPolicy instanceof TimedRotationPolicy) {
+ LOG.warn("*** Exactly once semantics is not supported with TimedRotationPolicy ***");
+ LOG.warn("*** Turning off exactly once.");
+ this.exactlyOnce = false;
+ }
if (this.fsUrl == null) {
throw new IllegalStateException("File system URL must be specified.");
}
this.fileNameFormat.prepare(conf, partitionIndex, numPartitions);
this.hdfsConfig = new Configuration();
- Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
- if(map != null){
- for(String key : map.keySet()){
+ Map<String, Object> map = (Map<String, Object>) conf.get(this.configKey);
+ if (map != null) {
+ for (String key : map.keySet()) {
this.hdfsConfig.set(key, String.valueOf(map.get(key)));
}
}
- try{
+ try {
HdfsSecurityUtil.login(conf, hdfsConfig);
doPrepare(conf, partitionIndex, numPartitions);
this.currentFile = createOutputFile();
- } catch (Exception e){
+ } catch (Exception e) {
throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
}
- if(this.rotationPolicy instanceof TimedRotationPolicy){
- long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
+ if (this.rotationPolicy instanceof TimedRotationPolicy) {
+ long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
this.rotationTimer = new Timer(true);
TimerTask task = new TimerTask() {
@Override
public void run() {
try {
rotateOutputFile();
- } catch(IOException e){
+ } catch (IOException e) {
LOG.warn("IOException during scheduled file rotation.", e);
}
}
@@ -129,6 +154,26 @@ public class HdfsState implements State {
}
}
+ /**
+ * Recovers nBytes from srcFile to the new file created
+ * by calling rotateOutputFile and then deletes the srcFile.
+ */
+ private void recover(String srcFile, long nBytes) {
+ try {
+ Path srcPath = new Path(srcFile);
+ if (nBytes > 0) {
+ rotateOutputFile(false);
+ this.rotationPolicy.reset();
+ doRecover(srcPath, nBytes);
+ LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+ }
+ fs.delete(srcPath, false);
+ } catch (Exception e) {
+ LOG.warn("Recovery failed.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
}
public static class HdfsFileOptions extends Options {
@@ -137,32 +182,33 @@ public class HdfsState implements State {
protected RecordFormat format;
private long offset = 0;
- public HdfsFileOptions withFsUrl(String fsUrl){
+ public HdfsFileOptions withFsUrl(String fsUrl) {
this.fsUrl = fsUrl;
return this;
}
- public HdfsFileOptions withConfigKey(String configKey){
+ public HdfsFileOptions withConfigKey(String configKey) {
this.configKey = configKey;
return this;
}
- public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat){
+ public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat) {
this.fileNameFormat = fileNameFormat;
return this;
}
- public HdfsFileOptions withRecordFormat(RecordFormat format){
+ public HdfsFileOptions withRecordFormat(RecordFormat format) {
this.format = format;
return this;
}
- public HdfsFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy){
+ public HdfsFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy) {
this.rotationPolicy = rotationPolicy;
return this;
}
- public HdfsFileOptions addRotationAction(RotationAction action){
+ @Deprecated
+ public HdfsFileOptions addRotationAction(RotationAction action) {
this.rotationActions.add(action);
return this;
}
@@ -174,6 +220,45 @@ public class HdfsState implements State {
}
@Override
+ public long getCurrentOffset() {
+ return offset;
+ }
+
+ @Override
+ public void doCommit(Long txId) throws IOException {
+ synchronized (writeLock) {
+ if (this.rotationPolicy.mark(this.offset)) {
+ rotateOutputFile();
+ this.offset = 0;
+ this.rotationPolicy.reset();
+ } else {
+ if (this.out instanceof HdfsDataOutputStream) {
+ ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ } else {
+ this.out.hsync();
+ }
+ }
+ }
+ }
+
+ @Override
+ void doRecover(Path srcPath, long nBytes) throws IOException {
+ this.offset = 0;
+ FSDataInputStream is = this.fs.open(srcPath);
+ copyBytes(is, out, nBytes);
+ this.offset = nBytes;
+ }
+
+ private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy) throws IOException {
+ byte[] buf = new byte[4096];
+ int n;
+ while ((n = is.read(buf)) != -1 && bytesToCopy > 0) {
+ out.write(buf, 0, (int) Math.min(n, bytesToCopy));
+ bytesToCopy -= n;
+ }
+ }
+
+ @Override
void closeOutputFile() throws IOException {
this.out.close();
}
@@ -187,26 +272,11 @@ public class HdfsState implements State {
@Override
public void execute(List<TridentTuple> tuples) throws IOException {
- boolean rotated = false;
synchronized (this.writeLock) {
for (TridentTuple tuple : tuples) {
byte[] bytes = this.format.format(tuple);
out.write(bytes);
this.offset += bytes.length;
-
- if (this.rotationPolicy.mark(tuple, this.offset)) {
- rotateOutputFile();
- this.offset = 0;
- this.rotationPolicy.reset();
- rotated = true;
- }
- }
- if (!rotated) {
- if (this.out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
- } else {
- this.out.hsync();
- }
}
}
}
@@ -219,7 +289,7 @@ public class HdfsState implements State {
private String compressionCodec = "default";
private transient CompressionCodecFactory codecFactory;
- public SequenceFileOptions withCompressionCodec(String codec){
+ public SequenceFileOptions withCompressionCodec(String codec) {
this.compressionCodec = codec;
return this;
}
@@ -229,7 +299,7 @@ public class HdfsState implements State {
return this;
}
- public SequenceFileOptions withConfigKey(String configKey){
+ public SequenceFileOptions withConfigKey(String configKey) {
this.configKey = configKey;
return this;
}
@@ -249,12 +319,12 @@ public class HdfsState implements State {
return this;
}
- public SequenceFileOptions withCompressionType(SequenceFile.CompressionType compressionType){
+ public SequenceFileOptions withCompressionType(SequenceFile.CompressionType compressionType) {
this.compressionType = compressionType;
return this;
}
- public SequenceFileOptions addRotationAction(RotationAction action){
+ public SequenceFileOptions addRotationAction(RotationAction action) {
this.rotationActions.add(action);
return this;
}
@@ -269,6 +339,36 @@ public class HdfsState implements State {
}
@Override
+ public long getCurrentOffset() throws IOException {
+ return this.writer.getLength();
+ }
+
+ @Override
+ public void doCommit(Long txId) throws IOException {
+ synchronized (writeLock) {
+ if (this.rotationPolicy.mark(this.writer.getLength())) {
+ rotateOutputFile();
+ this.rotationPolicy.reset();
+ } else {
+ this.writer.hsync();
+ }
+ }
+ }
+
+
+ @Override
+ void doRecover(Path srcPath, long nBytes) throws Exception {
+ SequenceFile.Reader reader = new SequenceFile.Reader(this.hdfsConfig,
+ SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes));
+
+ Writable key = (Writable) this.format.keyClass().newInstance();
+ Writable value = (Writable) this.format.valueClass().newInstance();
+ while(reader.next(key, value)) {
+ this.writer.append(key, value);
+ }
+ }
+
+ @Override
Path createOutputFile() throws IOException {
Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
this.writer = SequenceFile.createWriter(
@@ -288,45 +388,165 @@ public class HdfsState implements State {
@Override
public void execute(List<TridentTuple> tuples) throws IOException {
- long offset;
- for(TridentTuple tuple : tuples) {
+ for (TridentTuple tuple : tuples) {
synchronized (this.writeLock) {
this.writer.append(this.format.key(tuple), this.format.value(tuple));
- offset = this.writer.getLength();
- }
-
- if (this.rotationPolicy.mark(tuple, offset)) {
- rotateOutputFile();
- this.rotationPolicy.reset();
}
}
}
}
+ /**
+ * TxnRecord [txnid, data_file_path, data_file_offset]
+ * <p>
+ * This is written to the index file during beginCommit() and used for recovery.
+ * </p>
+ */
+ private static class TxnRecord {
+ private long txnid;
+ private String dataFilePath;
+ private long offset;
+
+ private TxnRecord(long txnId, String dataFilePath, long offset) {
+ this.txnid = txnId;
+ this.dataFilePath = dataFilePath;
+ this.offset = offset;
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(txnid) + "," + dataFilePath + "," + Long.toString(offset);
+ }
+ }
+
+
public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class);
private Options options;
+ private volatile TxnRecord lastSeenTxn;
+ private Path indexFilePath;
- HdfsState(Options options){
+ HdfsState(Options options) {
this.options = options;
}
- void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions){
+ void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
this.options.prepare(conf, partitionIndex, numPartitions);
+ if (options.isExactlyOnce()) {
+ initLastTxn(conf, partitionIndex);
+ }
+ }
+
+ private TxnRecord readTxnRecord(Path path) throws IOException {
+ FSDataInputStream inputStream = null;
+ try {
+ inputStream = this.options.fs.open(path);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+ String line;
+ if ((line = reader.readLine()) != null) {
+ String[] fields = line.split(",");
+ return new TxnRecord(Long.valueOf(fields[0]), fields[1], Long.valueOf(fields[2]));
+ }
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+ return new TxnRecord(0, options.currentFile.toString(), 0);
+ }
+
+ /**
+ * Reads the last txn record from index file if it exists, if not
+ * from .tmp file if exists.
+ *
+ * @param indexFilePath the index file path
+ * @return the txn record from the index file or a default initial record.
+ * @throws IOException
+ */
+ private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
+ Path tmpPath = new Path(indexFilePath.toString() + ".tmp");
+ if (this.options.fs.exists(indexFilePath)) {
+ return readTxnRecord(indexFilePath);
+ } else if (this.options.fs.exists(tmpPath)) {
+ return readTxnRecord(tmpPath);
+ }
+ return new TxnRecord(0, options.currentFile.toString(), 0);
+ }
+
+ private void initLastTxn(Map conf, int partition) {
+ // include partition id in the file name so that index for different partitions are independent.
+ String indexFileName = String.format(".index.%s.%d", conf.get(Config.TOPOLOGY_NAME), partition);
+ this.indexFilePath = new Path(options.fileNameFormat.getPath(), indexFileName);
+ try {
+ this.lastSeenTxn = getTxnRecord(indexFilePath);
+ LOG.debug("initLastTxn updated lastSeenTxn to [{}]", this.lastSeenTxn);
+ } catch (IOException e) {
+ LOG.warn("initLastTxn failed due to IOException.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void updateIndex(long txId) {
+ FSDataOutputStream out = null;
+ LOG.debug("Starting index update.");
+ try {
+ Path tmpPath = new Path(this.indexFilePath.toString() + ".tmp");
+ out = this.options.fs.create(tmpPath, true);
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
+ TxnRecord txnRecord = new TxnRecord(txId, options.currentFile.toString(), this.options.getCurrentOffset());
+ bw.write(txnRecord.toString());
+ bw.newLine();
+ bw.flush();
+ /*
+ * Delete the current index file and rename the tmp file to atomically
+ * replace the index file. Orphan .tmp files are handled in getTxnRecord.
+ */
+ options.fs.delete(this.indexFilePath, false);
+ options.fs.rename(tmpPath, this.indexFilePath);
+ lastSeenTxn = txnRecord;
+ LOG.debug("updateIndex updated lastSeenTxn to [{}]", this.lastSeenTxn);
+ } catch (IOException e) {
+ LOG.warn("Begin commit failed due to IOException. Failing batch", e);
+ throw new FailedException(e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ LOG.warn("Begin commit failed due to IOException. Failing batch", e);
+ throw new FailedException(e);
+ }
+ }
+ }
}
@Override
public void beginCommit(Long txId) {
+ if (options.isExactlyOnce()) {
+ if (txId <= lastSeenTxn.txnid) {
+ LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
+ long start = System.currentTimeMillis();
+ options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
+ LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
+ }
+ updateIndex(txId);
+ }
}
@Override
public void commit(Long txId) {
+ try {
+ options.doCommit(txId);
+ } catch (IOException e) {
+ LOG.warn("Commit failed due to IOException. Failing the batch.", e);
+ throw new FailedException(e);
+ }
}
- public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector){
- try{
+ public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector) {
+ try {
this.options.execute(tuples);
- } catch (IOException e){
+ } catch (IOException e) {
LOG.warn("Failing batch due to IOException.", e);
throw new FailedException(e);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
index 3db56f8..89fd918 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
@@ -41,6 +41,14 @@ public interface FileRotationPolicy extends Serializable {
*/
boolean mark(TridentTuple tuple, long offset);
+ /**
+ * Check if a file rotation should be performed based on
+ * the offset at which file is being written.
+ *
+ * @param offset the current offset of file being written
+ * @return true if a file rotation should be performed.
+ */
+ boolean mark(long offset);
/**
* Called after the HdfsBolt rotates a file.
http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 93f0d58..5d99108 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -66,6 +66,11 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
@Override
public boolean mark(TridentTuple tuple, long offset) {
+ return mark(offset);
+ }
+
+ @Override
+ public boolean mark(long offset) {
long diff = offset - this.lastOffset;
this.currentBytesWritten += diff;
this.lastOffset = offset;
http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
index 50130e5..caabee5 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
@@ -30,6 +30,11 @@ public class NoRotationPolicy implements FileRotationPolicy {
}
@Override
+ public boolean mark(long offset) {
+ return false;
+ }
+
+ @Override
public void reset() {
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index a75e2e5..f407d1d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -57,6 +57,11 @@ public class TimedRotationPolicy implements FileRotationPolicy {
return false;
}
+ @Override
+ public boolean mark(long offset) {
+ return false;
+ }
+
/**
* Called after the HdfsBolt rotates a file.
*/
[08/12] storm git commit: updated README.md
Posted by da...@apache.org.
updated README.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d8c51b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d8c51b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d8c51b4
Branch: refs/heads/master
Commit: 4d8c51b4503110f03a27b401f096ff2477bbb7cf
Parents: cdabbb3
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 10 14:01:24 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 10 14:01:24 2015 +0530
----------------------------------------------------------------------
external/storm-hdfs/README.md | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8c51b4/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 98fcc55..e1f2b28 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -323,6 +323,9 @@ duplicates from the current data file by copying the data up to the last transac
operation involves a lot of data copy, ensure that the data files are rotated at reasonable sizes with `FileSizeRotationPolicy`
and at reasonable intervals with `TimedRotationPolicy` so that the recovery can complete within topology.message.timeout.secs.
+Also note with `TimedRotationPolicy` the files are never rotated in the middle of a batch even if the timer ticks,
+but only when a batch completes so that complete batches can be efficiently recovered in case of failures.
+
##Working with Secure HDFS
If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We
currently have 2 options to support this:
[03/12] storm git commit: Make the buffer size in file copy an option
and set a reasonable default
Posted by da...@apache.org.
Make the buffer size in file copy an option and set a reasonable default
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e71e2ddf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e71e2ddf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e71e2ddf
Branch: refs/heads/master
Commit: e71e2ddfafe8bfb082320c2c3b40f4fb2a3d4995
Parents: 579dc87
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Jul 22 12:31:46 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Jul 22 12:31:46 2015 +0530
----------------------------------------------------------------------
.../main/java/org/apache/storm/hdfs/trident/HdfsState.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e71e2ddf/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 9b9ba8e..8d32b32 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -193,6 +193,7 @@ public class HdfsState implements State {
private transient FSDataOutputStream out;
protected RecordFormat format;
private long offset = 0;
+ private int bufferSize = 131072; // default 128 K
public HdfsFileOptions withFsUrl(String fsUrl) {
this.fsUrl = fsUrl;
@@ -219,6 +220,11 @@ public class HdfsState implements State {
return this;
}
+ public HdfsFileOptions withBufferSize(int size) {
+ this.bufferSize = Math.max(4096, size); // at least 4K
+ return this;
+ }
+
@Deprecated
public HdfsFileOptions addRotationAction(RotationAction action) {
this.rotationActions.add(action);
@@ -262,7 +268,7 @@ public class HdfsState implements State {
}
private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy) throws IOException {
- byte[] buf = new byte[4096];
+ byte[] buf = new byte[bufferSize];
int n;
while ((n = is.read(buf)) != -1 && bytesToCopy > 0) {
out.write(buf, 0, (int) Math.min(n, bytesToCopy));
[10/12] storm git commit: Merge branch 'master' of
https://github.com/arunmahadevan/storm into merge-STORM-837
Posted by da...@apache.org.
Merge branch 'master' of https://github.com/arunmahadevan/storm into merge-STORM-837
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/430375c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/430375c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/430375c0
Branch: refs/heads/master
Commit: 430375c01ebfbc0342e84c5068872d13292b8edf
Parents: b8d5635 c44e02b
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri Aug 14 12:15:14 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri Aug 14 12:15:14 2015 -0500
----------------------------------------------------------------------
external/storm-hdfs/README.md | 9 +
external/storm-hdfs/pom.xml | 11 +
.../apache/storm/hdfs/trident/HdfsState.java | 392 +++++++++++++++----
.../trident/rotation/FileRotationPolicy.java | 14 +
.../rotation/FileSizeRotationPolicy.java | 13 +
.../hdfs/trident/rotation/NoRotationPolicy.java | 10 +
.../trident/rotation/TimedRotationPolicy.java | 31 +-
.../storm/hdfs/trident/HdfsStateTest.java | 206 ++++++++++
8 files changed, 602 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
[12/12] storm git commit: Merge STORM-837
Posted by da...@apache.org.
Merge STORM-837
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aa308e11
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aa308e11
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aa308e11
Branch: refs/heads/master
Commit: aa308e11685b8b7945f2d4963e9f63dfd322087b
Parents: b8d5635 a3fa9b1
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri Aug 14 12:57:53 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri Aug 14 12:57:53 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
README.markdown | 1 +
external/storm-hdfs/README.md | 9 +
external/storm-hdfs/pom.xml | 11 +
.../apache/storm/hdfs/trident/HdfsState.java | 392 +++++++++++++++----
.../trident/rotation/FileRotationPolicy.java | 14 +
.../rotation/FileSizeRotationPolicy.java | 13 +
.../hdfs/trident/rotation/NoRotationPolicy.java | 10 +
.../trident/rotation/TimedRotationPolicy.java | 31 +-
.../storm/hdfs/trident/HdfsStateTest.java | 206 ++++++++++
10 files changed, 604 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
[05/12] storm git commit: Added doc to explain when exaclty once
semantics is supported; Auto disable exactly once if file size > 1GB.
Posted by da...@apache.org.
Added doc to explain when exaclty once semantics is supported; Auto disable exactly once if file size > 1GB.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8505c3ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8505c3ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8505c3ef
Branch: refs/heads/master
Commit: 8505c3ef2d5e32c1716f49f6928851363c25df91
Parents: 85eadd7
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 3 12:02:52 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 3 12:02:52 2015 +0530
----------------------------------------------------------------------
external/storm-hdfs/README.md | 9 +++++++
.../apache/storm/hdfs/trident/HdfsState.java | 26 +++++++++++++++++---
.../rotation/FileSizeRotationPolicy.java | 3 +++
3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index b37af7e..bb4a618 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -317,6 +317,15 @@ that of the bolts.
.addRotationAction(new MoveFileAction().toDestination("/dest2/"));
```
+### Note
+Whenever a batch is replayed by storm (due to failures), the trident state implementation automatically removes
+duplicates from the current data file by copying the data up to the last transaction to another file . Since this
+operation involves a lot of data copy, the exactly once semantics is enabled only if `FileSizeRotationPolicy` with
+file size less than 1 GB is specified.
+
+The exactly once semantics is automatically disabled if `FileSizeRotationPolicy` with size greater than 1 GB or
+`TimedRotationPolicy` is in use.
+
##Working with Secure HDFS
If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We
currently have 2 options to support this:
http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 8d32b32..3dc566c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -35,6 +35,7 @@ import org.apache.storm.hdfs.trident.format.FileNameFormat;
import org.apache.storm.hdfs.trident.format.RecordFormat;
import org.apache.storm.hdfs.trident.format.SequenceFormat;
import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,9 +125,16 @@ public class HdfsState implements State {
this.writeLock = new Object();
if (this.rotationPolicy == null) {
throw new IllegalStateException("RotationPolicy must be specified.");
+ } else if (this.rotationPolicy instanceof FileSizeRotationPolicy) {
+ long limit = FileSizeRotationPolicy.Units.GB.getByteCount();
+ if(((FileSizeRotationPolicy) rotationPolicy).getMaxBytes() > limit) {
+ LOG.warn("*** Exactly once semantics is not supported for FileSizeRotationPolicy with size > 1 GB ***");
+ LOG.warn("Turning off exactly once.");
+ this.exactlyOnce = false;
+ }
} else if (this.rotationPolicy instanceof TimedRotationPolicy) {
- LOG.warn("*** Exactly once semantics is not supported with TimedRotationPolicy ***");
- LOG.warn("*** Turning off exactly once.");
+ LOG.warn("*** Exactly once semantics is not supported for TimedRotationPolicy ***");
+ LOG.warn("Turning off exactly once.");
this.exactlyOnce = false;
}
if (this.fsUrl == null) {
@@ -220,8 +228,18 @@ public class HdfsState implements State {
return this;
}
- public HdfsFileOptions withBufferSize(int size) {
- this.bufferSize = Math.max(4096, size); // at least 4K
+ /**
+ * <p>Set the size of the buffer used for hdfs file copy in case of recovery. The default
+ * value is 131072.</p>
+ *
+ * <p> Note: The lower limit for the parameter is 4096, below which the
+ * option is ignored. </p>
+ *
+ * @param sizeInBytes the buffer size in bytes
+ * @return {@link HdfsFileOptions}
+ */
+ public HdfsFileOptions withBufferSize(int sizeInBytes) {
+ this.bufferSize = Math.max(4096, sizeInBytes); // at least 4K
return this;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 5d99108..79b4f75 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -83,4 +83,7 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
this.lastOffset = 0;
}
+ public long getMaxBytes() {
+ return maxBytes;
+ }
}
[02/12] storm git commit: Removing wildcard imports
Posted by da...@apache.org.
Removing wildcard imports
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/579dc87f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/579dc87f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/579dc87f
Branch: refs/heads/master
Commit: 579dc87f04cf24fc9419945d54519fce09584178
Parents: 5631b9d
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Jul 22 10:14:24 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Jul 22 10:14:43 2015 +0530
----------------------------------------------------------------------
.../apache/storm/hdfs/trident/HdfsState.java | 20 ++++++++++++++++----
1 file changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/579dc87f/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 9b6b48c..9b9ba8e 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -21,7 +21,10 @@ import backtype.storm.Config;
import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
@@ -32,7 +35,6 @@ import org.apache.storm.hdfs.trident.format.FileNameFormat;
import org.apache.storm.hdfs.trident.format.RecordFormat;
import org.apache.storm.hdfs.trident.format.SequenceFormat;
import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
-
import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,9 +42,19 @@ import storm.trident.operation.TridentCollector;
import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
public class HdfsState implements State {
[04/12] storm git commit: Merge remote-tracking branch
'upstream/master'
Posted by da...@apache.org.
Merge remote-tracking branch 'upstream/master'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85eadd79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85eadd79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85eadd79
Branch: refs/heads/master
Commit: 85eadd790198a506a020fb4d1fd3247741b7db64
Parents: e71e2dd 9b2fd72
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Jul 24 09:09:51 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Fri Jul 24 09:09:51 2015 +0530
----------------------------------------------------------------------
CHANGELOG.md | 1 +
.../src/clj/backtype/storm/daemon/task.clj | 1 -
storm-core/src/clj/backtype/storm/tuple.clj | 6 ++-
.../storm/grouping/PartialKeyGrouping.java | 27 +++++++++++-
.../storm/testing/TestWordBytesCounter.java | 27 ++++++++++++
.../backtype/storm/testing/TestWordCounter.java | 6 ++-
.../test/clj/backtype/storm/grouping_test.clj | 43 +++++++++++++-------
7 files changed, 91 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
[07/12] storm git commit: Support exactly once always;
added unit tests.
Posted by da...@apache.org.
Support exactly once always; added unit tests.
1. Enable exaclty once irrespective of the rotation policy in use.
2. For TimedRotationPolicy set a flag and do the actual rotation in doCommit.
3. Added unit tests and updated README.md as per the new behavior.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cdabbb3d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cdabbb3d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cdabbb3d
Branch: refs/heads/master
Commit: cdabbb3db273895a79939f40894b999167532125
Parents: df63f75
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 10 13:09:53 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 10 13:09:53 2015 +0530
----------------------------------------------------------------------
external/storm-hdfs/README.md | 9 +-
external/storm-hdfs/pom.xml | 11 +
.../apache/storm/hdfs/trident/HdfsState.java | 142 +++++--------
.../trident/rotation/TimedRotationPolicy.java | 27 ++-
.../storm/hdfs/trident/HdfsStateTest.java | 206 +++++++++++++++++++
5 files changed, 299 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index bb4a618..98fcc55 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -319,12 +319,9 @@ that of the bolts.
### Note
Whenever a batch is replayed by storm (due to failures), the trident state implementation automatically removes
-duplicates from the current data file by copying the data up to the last transaction to another file . Since this
-operation involves a lot of data copy, the exactly once semantics is enabled only if `FileSizeRotationPolicy` with
-file size less than 1 GB is specified.
-
-The exactly once semantics is automatically disabled if `FileSizeRotationPolicy` with size greater than 1 GB or
-`TimedRotationPolicy` is in use.
+duplicates from the current data file by copying the data up to the last transaction to another file. Since this
+operation involves a lot of data copy, ensure that the data files are rotated at reasonable sizes with `FileSizeRotationPolicy`
+and at reasonable intervals with `TimedRotationPolicy` so that the recovery can complete within topology.message.timeout.secs.
##Working with Secure HDFS
If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 7e927bf..15e6f72 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -68,6 +68,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 3dc566c..8b29f66 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -54,8 +54,6 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
public class HdfsState implements State {
@@ -70,12 +68,7 @@ public class HdfsState implements State {
protected int rotation = 0;
protected transient Configuration hdfsConfig;
protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
- protected transient Object writeLock;
- protected transient Timer rotationTimer;
- /**
- * This is on by default unless TimedRotationPolicy is in use.
- */
- private boolean exactlyOnce = true;
+
abstract void closeOutputFile() throws IOException;
@@ -91,29 +84,21 @@ public class HdfsState implements State {
abstract void doRecover(Path srcPath, long nBytes) throws Exception;
- protected boolean isExactlyOnce() {
- return this.exactlyOnce;
- }
-
protected void rotateOutputFile(boolean doRotateAction) throws IOException {
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
- synchronized (this.writeLock) {
- closeOutputFile();
- this.rotation++;
- Path newFile = createOutputFile();
- if (doRotateAction) {
- LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
- for (RotationAction action : this.rotationActions) {
- action.execute(this.fs, this.currentFile);
- }
+ closeOutputFile();
+ this.rotation++;
+ Path newFile = createOutputFile();
+ if (doRotateAction) {
+ LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
+ for (RotationAction action : this.rotationActions) {
+ action.execute(this.fs, this.currentFile);
}
- this.currentFile = newFile;
}
+ this.currentFile = newFile;
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms.", time);
-
-
}
protected void rotateOutputFile() throws IOException {
@@ -122,20 +107,17 @@ public class HdfsState implements State {
void prepare(Map conf, int partitionIndex, int numPartitions) {
- this.writeLock = new Object();
if (this.rotationPolicy == null) {
throw new IllegalStateException("RotationPolicy must be specified.");
} else if (this.rotationPolicy instanceof FileSizeRotationPolicy) {
- long limit = FileSizeRotationPolicy.Units.GB.getByteCount();
- if(((FileSizeRotationPolicy) rotationPolicy).getMaxBytes() > limit) {
- LOG.warn("*** Exactly once semantics is not supported for FileSizeRotationPolicy with size > 1 GB ***");
- LOG.warn("Turning off exactly once.");
- this.exactlyOnce = false;
- }
+ long rotationBytes = ((FileSizeRotationPolicy) rotationPolicy).getMaxBytes();
+ LOG.warn("FileSizeRotationPolicy specified with {} bytes.", rotationBytes);
+ LOG.warn("Recovery will fail if data files cannot be copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big with the FileSizeRotationPolicy.");
} else if (this.rotationPolicy instanceof TimedRotationPolicy) {
- LOG.warn("*** Exactly once semantics is not supported for TimedRotationPolicy ***");
- LOG.warn("Turning off exactly once.");
- this.exactlyOnce = false;
+ LOG.warn("TimedRotationPolicy specified with interval {} ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
+ LOG.warn("Recovery will fail if data files cannot be copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big with the TimedRotationPolicy.");
}
if (this.fsUrl == null) {
throw new IllegalStateException("File system URL must be specified.");
@@ -158,19 +140,7 @@ public class HdfsState implements State {
}
if (this.rotationPolicy instanceof TimedRotationPolicy) {
- long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
- this.rotationTimer = new Timer(true);
- TimerTask task = new TimerTask() {
- @Override
- public void run() {
- try {
- rotateOutputFile();
- } catch (IOException e) {
- LOG.warn("IOException during scheduled file rotation.", e);
- }
- }
- };
- this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+ ((TimedRotationPolicy) this.rotationPolicy).start();
}
}
@@ -181,13 +151,16 @@ public class HdfsState implements State {
private void recover(String srcFile, long nBytes) {
try {
Path srcPath = new Path(srcFile);
+ rotateOutputFile(false);
+ this.rotationPolicy.reset();
if (nBytes > 0) {
- rotateOutputFile(false);
- this.rotationPolicy.reset();
doRecover(srcPath, nBytes);
LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+ } else {
+ LOG.info("Nothing to recover from {}", srcFile);
}
fs.delete(srcPath, false);
+ LOG.info("Deleted file {} that had partial commits.", srcFile);
} catch (Exception e) {
LOG.warn("Recovery failed.", e);
throw new RuntimeException(e);
@@ -251,7 +224,7 @@ public class HdfsState implements State {
@Override
void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
- LOG.info("Preparing HDFS Bolt...");
+ LOG.info("Preparing HDFS File state...");
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
}
@@ -262,17 +235,15 @@ public class HdfsState implements State {
@Override
public void doCommit(Long txId) throws IOException {
- synchronized (writeLock) {
- if (this.rotationPolicy.mark(this.offset)) {
- rotateOutputFile();
- this.offset = 0;
- this.rotationPolicy.reset();
+ if (this.rotationPolicy.mark(this.offset)) {
+ rotateOutputFile();
+ this.offset = 0;
+ this.rotationPolicy.reset();
+ } else {
+ if (this.out instanceof HdfsDataOutputStream) {
+ ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
} else {
- if (this.out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
- } else {
- this.out.hsync();
- }
+ this.out.hsync();
}
}
}
@@ -308,12 +279,10 @@ public class HdfsState implements State {
@Override
public void execute(List<TridentTuple> tuples) throws IOException {
- synchronized (this.writeLock) {
- for (TridentTuple tuple : tuples) {
- byte[] bytes = this.format.format(tuple);
- out.write(bytes);
- this.offset += bytes.length;
- }
+ for (TridentTuple tuple : tuples) {
+ byte[] bytes = this.format.format(tuple);
+ out.write(bytes);
+ this.offset += bytes.length;
}
}
}
@@ -381,13 +350,11 @@ public class HdfsState implements State {
@Override
public void doCommit(Long txId) throws IOException {
- synchronized (writeLock) {
- if (this.rotationPolicy.mark(this.writer.getLength())) {
- rotateOutputFile();
- this.rotationPolicy.reset();
- } else {
- this.writer.hsync();
- }
+ if (this.rotationPolicy.mark(this.writer.getLength())) {
+ rotateOutputFile();
+ this.rotationPolicy.reset();
+ } else {
+ this.writer.hsync();
}
}
@@ -425,9 +392,7 @@ public class HdfsState implements State {
@Override
public void execute(List<TridentTuple> tuples) throws IOException {
for (TridentTuple tuple : tuples) {
- synchronized (this.writeLock) {
- this.writer.append(this.format.key(tuple), this.format.value(tuple));
- }
+ this.writer.append(this.format.key(tuple), this.format.value(tuple));
}
}
@@ -468,9 +433,7 @@ public class HdfsState implements State {
void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
this.options.prepare(conf, partitionIndex, numPartitions);
- if (options.isExactlyOnce()) {
- initLastTxn(conf, partitionIndex);
- }
+ initLastTxn(conf, partitionIndex);
}
private TxnRecord readTxnRecord(Path path) throws IOException {
@@ -558,15 +521,13 @@ public class HdfsState implements State {
@Override
public void beginCommit(Long txId) {
- if (options.isExactlyOnce()) {
- if (txId <= lastSeenTxn.txnid) {
- LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
- long start = System.currentTimeMillis();
- options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
- LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
- }
- updateIndex(txId);
+ if (txId <= lastSeenTxn.txnid) {
+ LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
+ long start = System.currentTimeMillis();
+ options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
+ LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
}
+ updateIndex(txId);
}
@Override
@@ -587,4 +548,11 @@ public class HdfsState implements State {
throw new FailedException(e);
}
}
+
+ /**
+ * for unit tests
+ */
+ void close() throws IOException {
+ this.options.closeOutputFile();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index f407d1d..74e7fab 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -19,6 +19,10 @@ package org.apache.storm.hdfs.trident.rotation;
import storm.trident.tuple.TridentTuple;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class TimedRotationPolicy implements FileRotationPolicy {
@@ -41,6 +45,9 @@ public class TimedRotationPolicy implements FileRotationPolicy {
}
private long interval;
+ private Timer rotationTimer;
+ private AtomicBoolean rotationTimerTriggered = new AtomicBoolean();
+
public TimedRotationPolicy(float count, TimeUnit units){
this.interval = (long)(count * units.getMilliSeconds());
@@ -54,12 +61,12 @@ public class TimedRotationPolicy implements FileRotationPolicy {
*/
@Override
public boolean mark(TridentTuple tuple, long offset) {
- return false;
+ return rotationTimerTriggered.get();
}
@Override
public boolean mark(long offset) {
- return false;
+ return rotationTimerTriggered.get();
}
/**
@@ -67,10 +74,24 @@ public class TimedRotationPolicy implements FileRotationPolicy {
*/
@Override
public void reset() {
-
+ rotationTimerTriggered.set(false);
}
public long getInterval(){
return this.interval;
}
+
+ /**
+ * Start the timer to run at fixed intervals.
+ */
+ public void start() {
+ rotationTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ rotationTimerTriggered.set(true);
+ }
+ };
+ rotationTimer.scheduleAtFixedRate(task, interval, interval);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
new file mode 100644
index 0000000..e1f45df
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
@@ -0,0 +1,206 @@
+package org.apache.storm.hdfs.trident;
+
+import backtype.storm.Config;
+import backtype.storm.tuple.Fields;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class HdfsStateTest {
+
+ private static final String TEST_OUT_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "trident-unit-test").toString();
+
+ private static final String FILE_NAME_PREFIX = "hdfs-data-";
+ private static final String TEST_TOPOLOGY_NAME = "test-topology";
+ private static final String INDEX_FILE_PREFIX = ".index.";
+ private final TestFileNameFormat fileNameFormat = new TestFileNameFormat();
+
+ private static class TestFileNameFormat implements FileNameFormat {
+ private String currentFileName = "";
+
+ @Override
+ public void prepare(Map conf, int partitionIndex, int numPartitions) {
+
+ }
+
+ @Override
+ public String getName(long rotation, long timeStamp) {
+ currentFileName = FILE_NAME_PREFIX + Long.toString(rotation);
+ return currentFileName;
+ }
+
+ @Override
+ public String getPath() {
+ return TEST_OUT_DIR;
+ }
+
+ public String getCurrentFileName() {
+ return currentFileName;
+ }
+ }
+
+ private HdfsState createHdfsState() {
+
+ Fields hdfsFields = new Fields("f1");
+
+ RecordFormat recordFormat = new DelimitedRecordFormat().withFields(hdfsFields);
+
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+ HdfsState.Options options = new HdfsState.HdfsFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(recordFormat)
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl("file://" + TEST_OUT_DIR);
+
+ Map<String, String> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_NAME, TEST_TOPOLOGY_NAME);
+
+ HdfsState state = new HdfsState(options);
+ state.prepare(conf, null, 0, 1);
+ return state;
+ }
+
+ private List<TridentTuple> createMockTridentTuples(int count) {
+ TridentTuple tuple = mock(TridentTuple.class);
+ when(tuple.getValueByField(any(String.class))).thenReturn("data");
+ List<TridentTuple> tuples = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ tuples.add(tuple);
+ }
+ return tuples;
+ }
+
+ private List<String> getLinesFromCurrentDataFile() throws IOException {
+ Path dataFile = Paths.get(TEST_OUT_DIR, fileNameFormat.getCurrentFileName());
+ List<String> lines = Files.readAllLines(dataFile, Charset.defaultCharset());
+ return lines;
+ }
+
+ @Before
+ public void setUp() {
+ FileUtils.deleteQuietly(new File(TEST_OUT_DIR));
+ }
+
+
+ @Test
+ public void testPrepare() throws Exception {
+ HdfsState state = createHdfsState();
+ Collection<File> files = FileUtils.listFiles(new File(TEST_OUT_DIR), null, false);
+ File hdfsDataFile = Paths.get(TEST_OUT_DIR, FILE_NAME_PREFIX + "0").toFile();
+ Assert.assertTrue(files.contains(hdfsDataFile));
+ }
+
+ @Test
+ public void testIndexFileCreation() throws Exception {
+ HdfsState state = createHdfsState();
+ state.beginCommit(1L);
+ Collection<File> files = FileUtils.listFiles(new File(TEST_OUT_DIR), null, false);
+ File hdfsIndexFile = Paths.get(TEST_OUT_DIR, INDEX_FILE_PREFIX + TEST_TOPOLOGY_NAME + ".0").toFile();
+ Assert.assertTrue(files.contains(hdfsIndexFile));
+ }
+
+ @Test
+ public void testUpdateState() throws Exception {
+ HdfsState state = createHdfsState();
+ state.beginCommit(1L);
+ int tupleCount = 100;
+ state.updateState(createMockTridentTuples(tupleCount), null);
+ state.commit(1L);
+ state.close();
+ List<String> lines = getLinesFromCurrentDataFile();
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < tupleCount; i++) {
+ expected.add("data");
+ }
+ Assert.assertEquals(tupleCount, lines.size());
+ Assert.assertEquals(expected, lines);
+ }
+
+ @Test
+ public void testRecoverOneBatch() throws Exception {
+ HdfsState state = createHdfsState();
+ // batch 1 is played with 25 tuples initially.
+ state.beginCommit(1L);
+ state.updateState(createMockTridentTuples(25), null);
+ // batch 1 is replayed with 50 tuples.
+ int replayBatchSize = 50;
+ state.beginCommit(1L);
+ state.updateState(createMockTridentTuples(replayBatchSize), null);
+ state.commit(1L);
+ // close the state to force flush
+ state.close();
+ // Ensure that the original batch1 is discarded and new one is persisted.
+ List<String> lines = getLinesFromCurrentDataFile();
+ Assert.assertEquals(replayBatchSize, lines.size());
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < replayBatchSize; i++) {
+ expected.add("data");
+ }
+ Assert.assertEquals(expected, lines);
+ }
+
+ @Test
+ public void testRecoverMultipleBatches() throws Exception {
+ HdfsState state = createHdfsState();
+
+ // batch 1
+ int batch1Count = 10;
+ state.beginCommit(1L);
+ state.updateState(createMockTridentTuples(batch1Count), null);
+ state.commit(1L);
+
+ // batch 2
+ int batch2Count = 20;
+ state.beginCommit(2L);
+ state.updateState(createMockTridentTuples(batch2Count), null);
+ state.commit(2L);
+
+ // batch 3
+ int batch3Count = 30;
+ state.beginCommit(3L);
+ state.updateState(createMockTridentTuples(batch3Count), null);
+ state.commit(3L);
+
+ // batch 3 replayed with 40 tuples
+ int batch3ReplayCount = 40;
+ state.beginCommit(3L);
+ state.updateState(createMockTridentTuples(batch3ReplayCount), null);
+ state.commit(3L);
+ state.close();
+ /*
+ * total tuples should be
+ * recovered (batch-1 + batch-2) + replayed (batch-3)
+ */
+ List<String> lines = getLinesFromCurrentDataFile();
+ int preReplayCount = batch1Count + batch2Count + batch3Count;
+ int expectedTupleCount = batch1Count + batch2Count + batch3ReplayCount;
+
+ Assert.assertNotEquals(preReplayCount, lines.size());
+ Assert.assertEquals(expectedTupleCount, lines.size());
+ }
+}
\ No newline at end of file
[06/12] storm git commit: Merge remote-tracking branch
'upstream/master'
Posted by da...@apache.org.
Merge remote-tracking branch 'upstream/master'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/df63f75f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/df63f75f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/df63f75f
Branch: refs/heads/master
Commit: df63f75f92fe1cdc9368aedc06be1ea91034fd6c
Parents: 8505c3e 6de597a
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 3 12:06:44 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 3 12:06:44 2015 +0530
----------------------------------------------------------------------
CHANGELOG.md | 3 +
DEVELOPER.md | 7 +-
docs/documentation/Documentation.md | 4 +-
external/storm-hive/README.md | 1 +
.../org/apache/storm/hive/bolt/HiveBolt.java | 44 +++++---
.../apache/storm/hive/common/HiveWriter.java | 5 +-
.../apache/storm/hive/bolt/TestHiveBolt.java | 100 ++++++++++++++++---
external/storm-redis/README.md | 1 +
log4j2/cluster.xml | 6 +-
log4j2/worker.xml | 6 +-
10 files changed, 135 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
[11/12] storm git commit: Updates for STORM-837
Posted by da...@apache.org.
Updates for STORM-837
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a3fa9b19
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a3fa9b19
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a3fa9b19
Branch: refs/heads/master
Commit: a3fa9b195d1cb3a750fa1506efe017ba393ccff3
Parents: 430375c
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri Aug 14 12:22:17 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri Aug 14 12:22:17 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
README.markdown | 1 +
2 files changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a3fa9b19/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e80ea09..16cf2c0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-837: HdfsState ignores commits
* STORM-938: storm-hive add a time interval to flush tuples to hive.
* STORM-977: Incorrect signal (-9) when as-user is true
* STORM-964: Add config (with small default value) for logwriter to restrict its memory usage
http://git-wip-us.apache.org/repos/asf/storm/blob/a3fa9b19/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index a76673f..e965605 100644
--- a/README.markdown
+++ b/README.markdown
@@ -213,6 +213,7 @@ under the License.
* Adrian Seungjin Lee ([@sweetest](https://github.com/sweetest))
* Randy Gelhausen ([@randerzander](https://github.com/randerzander))
* Gabor Liptak ([@gliptak](https://github.com/glibtak))
+* Arun Mahadevan ([@arunmahadevan](https://github.com/arunmahadevan))
## Acknowledgements
[09/12] storm git commit: Refactoring code based on feedback
Posted by da...@apache.org.
Refactoring code based on feedback
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c44e02b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c44e02b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c44e02b3
Branch: refs/heads/master
Commit: c44e02b3818c572924d96ae1dc26bb6ab6df66c7
Parents: 4d8c51b
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Aug 14 11:58:54 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Fri Aug 14 11:58:54 2015 +0530
----------------------------------------------------------------------
.../java/org/apache/storm/hdfs/trident/HdfsState.java | 14 +++++++++-----
.../hdfs/trident/rotation/FileRotationPolicy.java | 6 ++++++
.../hdfs/trident/rotation/FileSizeRotationPolicy.java | 5 +++++
.../storm/hdfs/trident/rotation/NoRotationPolicy.java | 5 +++++
.../hdfs/trident/rotation/TimedRotationPolicy.java | 1 +
5 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 8b29f66..4448868 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -139,9 +139,7 @@ public class HdfsState implements State {
throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
}
- if (this.rotationPolicy instanceof TimedRotationPolicy) {
- ((TimedRotationPolicy) this.rotationPolicy).start();
- }
+ rotationPolicy.start();
}
/**
@@ -455,6 +453,12 @@ public class HdfsState implements State {
}
/**
+ * Returns temp file path corresponding to a file name.
+ */
+ private Path tmpFilePath(String filename) {
+ return new Path(filename + ".tmp");
+ }
+ /**
* Reads the last txn record from index file if it exists, if not
* from .tmp file if exists.
*
@@ -463,7 +467,7 @@ public class HdfsState implements State {
* @throws IOException
*/
private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
- Path tmpPath = new Path(indexFilePath.toString() + ".tmp");
+ Path tmpPath = tmpFilePath(indexFilePath.toString());
if (this.options.fs.exists(indexFilePath)) {
return readTxnRecord(indexFilePath);
} else if (this.options.fs.exists(tmpPath)) {
@@ -489,7 +493,7 @@ public class HdfsState implements State {
FSDataOutputStream out = null;
LOG.debug("Starting index update.");
try {
- Path tmpPath = new Path(this.indexFilePath.toString() + ".tmp");
+ Path tmpPath = tmpFilePath(indexFilePath.toString());
out = this.options.fs.create(tmpPath, true);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
TxnRecord txnRecord = new TxnRecord(txId, options.currentFile.toString(), this.options.getCurrentOffset());
http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
index 89fd918..f429221 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
@@ -55,4 +55,10 @@ public interface FileRotationPolicy extends Serializable {
*
*/
void reset();
+
+ /**
+ * Start the policy. Useful in case of policies like timed rotation
+ * where the timer can be started.
+ */
+ void start();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 79b4f75..fad6455 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -83,6 +83,11 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
this.lastOffset = 0;
}
+ @Override
+ public void start() {
+
+ }
+
public long getMaxBytes() {
return maxBytes;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
index caabee5..8117f95 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
@@ -37,4 +37,9 @@ public class NoRotationPolicy implements FileRotationPolicy {
@Override
public void reset() {
}
+
+ @Override
+ public void start() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index 74e7fab..f8cfe44 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -84,6 +84,7 @@ public class TimedRotationPolicy implements FileRotationPolicy {
/**
* Start the timer to run at fixed intervals.
*/
+ @Override
public void start() {
rotationTimer = new Timer(true);
TimerTask task = new TimerTask() {