You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/08/14 20:01:17 UTC

[01/12] storm git commit: Support for exactly once semantics in HdfsState

Repository: storm
Updated Branches:
  refs/heads/master b8d5635e8 -> aa308e116


Support for exactly once semantics in HdfsState


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5631b9d4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5631b9d4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5631b9d4

Branch: refs/heads/master
Commit: 5631b9d4746f34127a1bc89cb4488d2b2d8ec9d7
Parents: 6a21b6a
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Jul 22 00:34:09 2015 +0530
Committer: Arun Iyer <ai...@Arun-Iyer-MBP.local>
Committed: Wed Jul 22 00:41:31 2015 +0530

----------------------------------------------------------------------
 .../apache/storm/hdfs/trident/HdfsState.java    | 336 +++++++++++++++----
 .../trident/rotation/FileRotationPolicy.java    |   8 +
 .../rotation/FileSizeRotationPolicy.java        |   5 +
 .../hdfs/trident/rotation/NoRotationPolicy.java |   5 +
 .../trident/rotation/TimedRotationPolicy.java   |   5 +
 5 files changed, 301 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 67fff88..9b6b48c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -17,14 +17,14 @@
  */
 package org.apache.storm.hdfs.trident;
 
+import backtype.storm.Config;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.topology.FailedException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
@@ -40,8 +40,7 @@ import storm.trident.operation.TridentCollector;
 import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
 import java.net.URI;
 import java.util.*;
 
@@ -60,6 +59,10 @@ public class HdfsState implements State {
         protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
         protected transient Object writeLock;
         protected transient Timer rotationTimer;
+        /**
+         * This is on by default unless TimedRotationPolicy is in use.
+         */
+        private boolean exactlyOnce = true;
 
         abstract void closeOutputFile() throws IOException;
 
@@ -69,17 +72,28 @@ public class HdfsState implements State {
 
         abstract void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException;
 
-        protected void rotateOutputFile() throws IOException {
+        abstract long getCurrentOffset() throws  IOException;
+
+        abstract void doCommit(Long txId) throws IOException;
+
+        abstract void doRecover(Path srcPath, long nBytes) throws Exception;
+
+        protected boolean isExactlyOnce() {
+            return this.exactlyOnce;
+        }
+
+        protected void rotateOutputFile(boolean doRotateAction) throws IOException {
             LOG.info("Rotating output file...");
             long start = System.currentTimeMillis();
             synchronized (this.writeLock) {
                 closeOutputFile();
                 this.rotation++;
-
                 Path newFile = createOutputFile();
-                LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
-                for (RotationAction action : this.rotationActions) {
-                    action.execute(this.fs, this.currentFile);
+                if (doRotateAction) {
+                    LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
+                    for (RotationAction action : this.rotationActions) {
+                        action.execute(this.fs, this.currentFile);
+                    }
                 }
                 this.currentFile = newFile;
             }
@@ -89,38 +103,49 @@ public class HdfsState implements State {
 
         }
 
-        void prepare(Map conf, int partitionIndex, int numPartitions){
+        protected void rotateOutputFile() throws IOException {
+            rotateOutputFile(true);
+        }
+
+
+        void prepare(Map conf, int partitionIndex, int numPartitions) {
             this.writeLock = new Object();
-            if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
+            if (this.rotationPolicy == null) {
+                throw new IllegalStateException("RotationPolicy must be specified.");
+            } else if (this.rotationPolicy instanceof TimedRotationPolicy) {
+                LOG.warn("*** Exactly once semantics is not supported with TimedRotationPolicy ***");
+                LOG.warn("*** Turning off exactly once.");
+                this.exactlyOnce = false;
+            }
             if (this.fsUrl == null) {
                 throw new IllegalStateException("File system URL must be specified.");
             }
             this.fileNameFormat.prepare(conf, partitionIndex, numPartitions);
             this.hdfsConfig = new Configuration();
-            Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
-            if(map != null){
-                for(String key : map.keySet()){
+            Map<String, Object> map = (Map<String, Object>) conf.get(this.configKey);
+            if (map != null) {
+                for (String key : map.keySet()) {
                     this.hdfsConfig.set(key, String.valueOf(map.get(key)));
                 }
             }
-            try{
+            try {
                 HdfsSecurityUtil.login(conf, hdfsConfig);
                 doPrepare(conf, partitionIndex, numPartitions);
                 this.currentFile = createOutputFile();
 
-            } catch (Exception e){
+            } catch (Exception e) {
                 throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
             }
 
-            if(this.rotationPolicy instanceof TimedRotationPolicy){
-                long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
+            if (this.rotationPolicy instanceof TimedRotationPolicy) {
+                long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
                 this.rotationTimer = new Timer(true);
                 TimerTask task = new TimerTask() {
                     @Override
                     public void run() {
                         try {
                             rotateOutputFile();
-                        } catch(IOException e){
+                        } catch (IOException e) {
                             LOG.warn("IOException during scheduled file rotation.", e);
                         }
                     }
@@ -129,6 +154,26 @@ public class HdfsState implements State {
             }
         }
 
+        /**
+         * Recovers nBytes from srcFile to the new file created
+         * by calling rotateOutputFile and then deletes the srcFile.
+         */
+        private void recover(String srcFile, long nBytes) {
+            try {
+                Path srcPath = new Path(srcFile);
+                if (nBytes > 0) {
+                    rotateOutputFile(false);
+                    this.rotationPolicy.reset();
+                    doRecover(srcPath, nBytes);
+                    LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+                }
+                fs.delete(srcPath, false);
+            } catch (Exception e) {
+                LOG.warn("Recovery failed.", e);
+                throw new RuntimeException(e);
+            }
+        }
+
     }
 
     public static class HdfsFileOptions extends Options {
@@ -137,32 +182,33 @@ public class HdfsState implements State {
         protected RecordFormat format;
         private long offset = 0;
 
-        public HdfsFileOptions withFsUrl(String fsUrl){
+        public HdfsFileOptions withFsUrl(String fsUrl) {
             this.fsUrl = fsUrl;
             return this;
         }
 
-        public HdfsFileOptions withConfigKey(String configKey){
+        public HdfsFileOptions withConfigKey(String configKey) {
             this.configKey = configKey;
             return this;
         }
 
-        public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat){
+        public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat) {
             this.fileNameFormat = fileNameFormat;
             return this;
         }
 
-        public HdfsFileOptions withRecordFormat(RecordFormat format){
+        public HdfsFileOptions withRecordFormat(RecordFormat format) {
             this.format = format;
             return this;
         }
 
-        public HdfsFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy){
+        public HdfsFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy) {
             this.rotationPolicy = rotationPolicy;
             return this;
         }
 
-        public HdfsFileOptions addRotationAction(RotationAction action){
+        @Deprecated
+        public HdfsFileOptions addRotationAction(RotationAction action) {
             this.rotationActions.add(action);
             return this;
         }
@@ -174,6 +220,45 @@ public class HdfsState implements State {
         }
 
         @Override
+        public long getCurrentOffset() {
+            return offset;
+        }
+
+        @Override
+        public void doCommit(Long txId) throws IOException {
+            synchronized (writeLock) {
+                if (this.rotationPolicy.mark(this.offset)) {
+                    rotateOutputFile();
+                    this.offset = 0;
+                    this.rotationPolicy.reset();
+                } else {
+                    if (this.out instanceof HdfsDataOutputStream) {
+                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+                    } else {
+                        this.out.hsync();
+                    }
+                }
+            }
+        }
+
+        @Override
+        void doRecover(Path srcPath, long nBytes) throws IOException {
+            this.offset = 0;
+            FSDataInputStream is = this.fs.open(srcPath);
+            copyBytes(is, out, nBytes);
+            this.offset = nBytes;
+        }
+
+        private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy) throws IOException {
+            byte[] buf = new byte[4096];
+            int n;
+            while ((n = is.read(buf)) != -1 && bytesToCopy > 0) {
+                out.write(buf, 0, (int) Math.min(n, bytesToCopy));
+                bytesToCopy -= n;
+            }
+        }
+
+        @Override
         void closeOutputFile() throws IOException {
             this.out.close();
         }
@@ -187,26 +272,11 @@ public class HdfsState implements State {
 
         @Override
         public void execute(List<TridentTuple> tuples) throws IOException {
-            boolean rotated = false;
             synchronized (this.writeLock) {
                 for (TridentTuple tuple : tuples) {
                     byte[] bytes = this.format.format(tuple);
                     out.write(bytes);
                     this.offset += bytes.length;
-
-                    if (this.rotationPolicy.mark(tuple, this.offset)) {
-                        rotateOutputFile();
-                        this.offset = 0;
-                        this.rotationPolicy.reset();
-                        rotated = true;
-                    }
-                }
-                if (!rotated) {
-                    if (this.out instanceof HdfsDataOutputStream) {
-                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-                    } else {
-                        this.out.hsync();
-                    }
                 }
             }
         }
@@ -219,7 +289,7 @@ public class HdfsState implements State {
         private String compressionCodec = "default";
         private transient CompressionCodecFactory codecFactory;
 
-        public SequenceFileOptions withCompressionCodec(String codec){
+        public SequenceFileOptions withCompressionCodec(String codec) {
             this.compressionCodec = codec;
             return this;
         }
@@ -229,7 +299,7 @@ public class HdfsState implements State {
             return this;
         }
 
-        public SequenceFileOptions withConfigKey(String configKey){
+        public SequenceFileOptions withConfigKey(String configKey) {
             this.configKey = configKey;
             return this;
         }
@@ -249,12 +319,12 @@ public class HdfsState implements State {
             return this;
         }
 
-        public SequenceFileOptions withCompressionType(SequenceFile.CompressionType compressionType){
+        public SequenceFileOptions withCompressionType(SequenceFile.CompressionType compressionType) {
             this.compressionType = compressionType;
             return this;
         }
 
-        public SequenceFileOptions addRotationAction(RotationAction action){
+        public SequenceFileOptions addRotationAction(RotationAction action) {
             this.rotationActions.add(action);
             return this;
         }
@@ -269,6 +339,36 @@ public class HdfsState implements State {
         }
 
         @Override
+        public long getCurrentOffset() throws IOException {
+            return this.writer.getLength();
+        }
+
+        @Override
+        public void doCommit(Long txId) throws IOException {
+            synchronized (writeLock) {
+                if (this.rotationPolicy.mark(this.writer.getLength())) {
+                    rotateOutputFile();
+                    this.rotationPolicy.reset();
+                } else {
+                    this.writer.hsync();
+                }
+            }
+        }
+
+
+        @Override
+        void doRecover(Path srcPath, long nBytes) throws Exception {
+            SequenceFile.Reader reader = new SequenceFile.Reader(this.hdfsConfig,
+                    SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes));
+
+            Writable key = (Writable) this.format.keyClass().newInstance();
+            Writable value = (Writable) this.format.valueClass().newInstance();
+            while(reader.next(key, value)) {
+                this.writer.append(key, value);
+            }
+        }
+
+        @Override
         Path createOutputFile() throws IOException {
             Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
             this.writer = SequenceFile.createWriter(
@@ -288,45 +388,165 @@ public class HdfsState implements State {
 
         @Override
         public void execute(List<TridentTuple> tuples) throws IOException {
-            long offset;
-            for(TridentTuple tuple : tuples) {
+            for (TridentTuple tuple : tuples) {
                 synchronized (this.writeLock) {
                     this.writer.append(this.format.key(tuple), this.format.value(tuple));
-                    offset = this.writer.getLength();
-                }
-
-                if (this.rotationPolicy.mark(tuple, offset)) {
-                    rotateOutputFile();
-                    this.rotationPolicy.reset();
                 }
             }
         }
 
     }
 
+    /**
+     * TxnRecord [txnid, data_file_path, data_file_offset]
+     * <p>
+     * This is written to the index file during beginCommit() and used for recovery.
+     * </p>
+     */
+    private static class TxnRecord {
+        private long txnid;
+        private String dataFilePath;
+        private long offset;
+
+        private TxnRecord(long txnId, String dataFilePath, long offset) {
+            this.txnid = txnId;
+            this.dataFilePath = dataFilePath;
+            this.offset = offset;
+        }
+
+        @Override
+        public String toString() {
+            return Long.toString(txnid) + "," + dataFilePath + "," + Long.toString(offset);
+        }
+    }
+
+
     public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class);
     private Options options;
+    private volatile TxnRecord lastSeenTxn;
+    private Path indexFilePath;
 
-    HdfsState(Options options){
+    HdfsState(Options options) {
         this.options = options;
     }
 
-    void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions){
+    void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         this.options.prepare(conf, partitionIndex, numPartitions);
+        if (options.isExactlyOnce()) {
+            initLastTxn(conf, partitionIndex);
+        }
+    }
+
+    private TxnRecord readTxnRecord(Path path) throws IOException {
+        FSDataInputStream inputStream = null;
+        try {
+            inputStream = this.options.fs.open(path);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+            String line;
+            if ((line = reader.readLine()) != null) {
+                String[] fields = line.split(",");
+                return new TxnRecord(Long.valueOf(fields[0]), fields[1], Long.valueOf(fields[2]));
+            }
+        } finally {
+            if (inputStream != null) {
+                inputStream.close();
+            }
+        }
+        return new TxnRecord(0, options.currentFile.toString(), 0);
+    }
+
+    /**
+     * Reads the last txn record from index file if it exists, if not
+     * from .tmp file if exists.
+     *
+     * @param indexFilePath the index file path
+     * @return the txn record from the index file or a default initial record.
+     * @throws IOException
+     */
+    private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
+        Path tmpPath = new Path(indexFilePath.toString() + ".tmp");
+        if (this.options.fs.exists(indexFilePath)) {
+            return readTxnRecord(indexFilePath);
+        } else if (this.options.fs.exists(tmpPath)) {
+            return readTxnRecord(tmpPath);
+        }
+        return new TxnRecord(0, options.currentFile.toString(), 0);
+    }
+
+    private void initLastTxn(Map conf, int partition) {
+        // include partition id in the file name so that index for different partitions are independent.
+        String indexFileName = String.format(".index.%s.%d", conf.get(Config.TOPOLOGY_NAME), partition);
+        this.indexFilePath = new Path(options.fileNameFormat.getPath(), indexFileName);
+        try {
+            this.lastSeenTxn = getTxnRecord(indexFilePath);
+            LOG.debug("initLastTxn updated lastSeenTxn to [{}]", this.lastSeenTxn);
+        } catch (IOException e) {
+            LOG.warn("initLastTxn failed due to IOException.", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void updateIndex(long txId) {
+        FSDataOutputStream out = null;
+        LOG.debug("Starting index update.");
+        try {
+            Path tmpPath = new Path(this.indexFilePath.toString() + ".tmp");
+            out = this.options.fs.create(tmpPath, true);
+            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
+            TxnRecord txnRecord = new TxnRecord(txId, options.currentFile.toString(), this.options.getCurrentOffset());
+            bw.write(txnRecord.toString());
+            bw.newLine();
+            bw.flush();
+            /*
+             * Delete the current index file and rename the tmp file to atomically
+             * replace the index file. Orphan .tmp files are handled in getTxnRecord.
+             */
+            options.fs.delete(this.indexFilePath, false);
+            options.fs.rename(tmpPath, this.indexFilePath);
+            lastSeenTxn = txnRecord;
+            LOG.debug("updateIndex updated lastSeenTxn to [{}]", this.lastSeenTxn);
+        } catch (IOException e) {
+            LOG.warn("Begin commit failed due to IOException. Failing batch", e);
+            throw new FailedException(e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    LOG.warn("Begin commit failed due to IOException. Failing batch", e);
+                    throw new FailedException(e);
+                }
+            }
+        }
     }
 
     @Override
     public void beginCommit(Long txId) {
+        if (options.isExactlyOnce()) {
+            if (txId <= lastSeenTxn.txnid) {
+                LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
+                long start = System.currentTimeMillis();
+                options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
+                LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
+            }
+            updateIndex(txId);
+        }
     }
 
     @Override
     public void commit(Long txId) {
+        try {
+            options.doCommit(txId);
+        } catch (IOException e) {
+            LOG.warn("Commit failed due to IOException. Failing the batch.", e);
+            throw new FailedException(e);
+        }
     }
 
-    public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector){
-        try{
+    public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector) {
+        try {
             this.options.execute(tuples);
-        } catch (IOException e){
+        } catch (IOException e) {
             LOG.warn("Failing batch due to IOException.", e);
             throw new FailedException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
index 3db56f8..89fd918 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
@@ -41,6 +41,14 @@ public interface FileRotationPolicy extends Serializable {
      */
     boolean mark(TridentTuple tuple, long offset);
 
+    /**
+     * Check if a file rotation should be performed based on
+     * the offset at which file is being written.
+     * 
+     * @param offset the current offset of file being written
+     * @return true if a file rotation should be performed.
+     */
+    boolean mark(long offset);
 
     /**
      * Called after the HdfsBolt rotates a file.

http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 93f0d58..5d99108 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -66,6 +66,11 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
 
     @Override
     public boolean mark(TridentTuple tuple, long offset) {
+        return mark(offset);
+    }
+
+    @Override
+    public boolean mark(long offset) {
         long diff = offset - this.lastOffset;
         this.currentBytesWritten += diff;
         this.lastOffset = offset;

http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
index 50130e5..caabee5 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
@@ -30,6 +30,11 @@ public class NoRotationPolicy implements FileRotationPolicy {
     }
 
     @Override
+    public boolean mark(long offset) {
+        return false;
+    }
+
+    @Override
     public void reset() {
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index a75e2e5..f407d1d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -57,6 +57,11 @@ public class TimedRotationPolicy implements FileRotationPolicy {
         return false;
     }
 
+    @Override
+    public boolean mark(long offset) {
+        return false;
+    }
+
     /**
      * Called after the HdfsBolt rotates a file.
      */


[08/12] storm git commit: updated README.md

Posted by da...@apache.org.
updated README.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d8c51b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d8c51b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d8c51b4

Branch: refs/heads/master
Commit: 4d8c51b4503110f03a27b401f096ff2477bbb7cf
Parents: cdabbb3
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 10 14:01:24 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 10 14:01:24 2015 +0530

----------------------------------------------------------------------
 external/storm-hdfs/README.md | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4d8c51b4/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 98fcc55..e1f2b28 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -323,6 +323,9 @@ duplicates from the current data file by copying the data up to the last transac
 operation involves a lot of data copy, ensure that the data files are rotated at reasonable sizes with `FileSizeRotationPolicy` 
 and at reasonable intervals with `TimedRotationPolicy` so that the recovery can complete within topology.message.timeout.secs.
 
+Also note with `TimedRotationPolicy` the files are never rotated in the middle of a batch even if the timer ticks, 
+but only when a batch completes so that complete batches can be efficiently recovered in case of failures.
+
 ##Working with Secure HDFS
 If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We 
 currently have 2 options to support this:


[03/12] storm git commit: Make the buffer size in file copy an option and set a reasonable default

Posted by da...@apache.org.
Make the buffer size in file copy an option and set a reasonable default


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e71e2ddf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e71e2ddf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e71e2ddf

Branch: refs/heads/master
Commit: e71e2ddfafe8bfb082320c2c3b40f4fb2a3d4995
Parents: 579dc87
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Jul 22 12:31:46 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Jul 22 12:31:46 2015 +0530

----------------------------------------------------------------------
 .../main/java/org/apache/storm/hdfs/trident/HdfsState.java   | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e71e2ddf/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 9b9ba8e..8d32b32 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -193,6 +193,7 @@ public class HdfsState implements State {
         private transient FSDataOutputStream out;
         protected RecordFormat format;
         private long offset = 0;
+        private int bufferSize =  131072; // default 128 K
 
         public HdfsFileOptions withFsUrl(String fsUrl) {
             this.fsUrl = fsUrl;
@@ -219,6 +220,11 @@ public class HdfsState implements State {
             return this;
         }
 
+        public HdfsFileOptions withBufferSize(int size) {
+            this.bufferSize = Math.max(4096, size); // at least 4K
+            return this;
+        }
+
         @Deprecated
         public HdfsFileOptions addRotationAction(RotationAction action) {
             this.rotationActions.add(action);
@@ -262,7 +268,7 @@ public class HdfsState implements State {
         }
 
         private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy) throws IOException {
-            byte[] buf = new byte[4096];
+            byte[] buf = new byte[bufferSize];
             int n;
             while ((n = is.read(buf)) != -1 && bytesToCopy > 0) {
                 out.write(buf, 0, (int) Math.min(n, bytesToCopy));


[10/12] storm git commit: Merge branch 'master' of https://github.com/arunmahadevan/storm into merge-STORM-837

Posted by da...@apache.org.
Merge branch 'master' of https://github.com/arunmahadevan/storm into merge-STORM-837


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/430375c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/430375c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/430375c0

Branch: refs/heads/master
Commit: 430375c01ebfbc0342e84c5068872d13292b8edf
Parents: b8d5635 c44e02b
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri Aug 14 12:15:14 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri Aug 14 12:15:14 2015 -0500

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   |   9 +
 external/storm-hdfs/pom.xml                     |  11 +
 .../apache/storm/hdfs/trident/HdfsState.java    | 392 +++++++++++++++----
 .../trident/rotation/FileRotationPolicy.java    |  14 +
 .../rotation/FileSizeRotationPolicy.java        |  13 +
 .../hdfs/trident/rotation/NoRotationPolicy.java |  10 +
 .../trident/rotation/TimedRotationPolicy.java   |  31 +-
 .../storm/hdfs/trident/HdfsStateTest.java       | 206 ++++++++++
 8 files changed, 602 insertions(+), 84 deletions(-)
----------------------------------------------------------------------



[12/12] storm git commit: Merge STORM-837

Posted by da...@apache.org.
Merge STORM-837


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aa308e11
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aa308e11
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aa308e11

Branch: refs/heads/master
Commit: aa308e11685b8b7945f2d4963e9f63dfd322087b
Parents: b8d5635 a3fa9b1
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri Aug 14 12:57:53 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri Aug 14 12:57:53 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 README.markdown                                 |   1 +
 external/storm-hdfs/README.md                   |   9 +
 external/storm-hdfs/pom.xml                     |  11 +
 .../apache/storm/hdfs/trident/HdfsState.java    | 392 +++++++++++++++----
 .../trident/rotation/FileRotationPolicy.java    |  14 +
 .../rotation/FileSizeRotationPolicy.java        |  13 +
 .../hdfs/trident/rotation/NoRotationPolicy.java |  10 +
 .../trident/rotation/TimedRotationPolicy.java   |  31 +-
 .../storm/hdfs/trident/HdfsStateTest.java       | 206 ++++++++++
 10 files changed, 604 insertions(+), 84 deletions(-)
----------------------------------------------------------------------



[05/12] storm git commit: Added doc to explain when exaclty once semantics is supported; Auto disable exactly once if file size > 1GB.

Posted by da...@apache.org.
Added doc to explain when exaclty once semantics is supported; Auto disable exactly once if file size > 1GB.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8505c3ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8505c3ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8505c3ef

Branch: refs/heads/master
Commit: 8505c3ef2d5e32c1716f49f6928851363c25df91
Parents: 85eadd7
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 3 12:02:52 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 3 12:02:52 2015 +0530

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   |  9 +++++++
 .../apache/storm/hdfs/trident/HdfsState.java    | 26 +++++++++++++++++---
 .../rotation/FileSizeRotationPolicy.java        |  3 +++
 3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index b37af7e..bb4a618 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -317,6 +317,15 @@ that of the bolts.
                 .addRotationAction(new MoveFileAction().toDestination("/dest2/"));
 ```
 
+### Note
+Whenever a batch is replayed by storm (due to failures), the trident state implementation automatically removes 
+duplicates from the current data file by copying the data up to the last transaction to another file . Since this 
+operation involves a lot of data copy, the exactly once semantics is enabled only if `FileSizeRotationPolicy` with 
+file size less than 1 GB is specified.
+
+The exactly once semantics is automatically disabled if `FileSizeRotationPolicy` with size greater than 1 GB or
+`TimedRotationPolicy` is in use.
+
 ##Working with Secure HDFS
 If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We 
 currently have 2 options to support this:

http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 8d32b32..3dc566c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -35,6 +35,7 @@ import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.format.RecordFormat;
 import org.apache.storm.hdfs.trident.format.SequenceFormat;
 import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
 import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,9 +125,16 @@ public class HdfsState implements State {
             this.writeLock = new Object();
             if (this.rotationPolicy == null) {
                 throw new IllegalStateException("RotationPolicy must be specified.");
+            } else if (this.rotationPolicy instanceof FileSizeRotationPolicy) {
+                long limit = FileSizeRotationPolicy.Units.GB.getByteCount();
+                if(((FileSizeRotationPolicy) rotationPolicy).getMaxBytes() > limit) {
+                    LOG.warn("*** Exactly once semantics is not supported for FileSizeRotationPolicy with size > 1 GB ***");
+                    LOG.warn("Turning off exactly once.");
+                    this.exactlyOnce = false;
+                }
             } else if (this.rotationPolicy instanceof TimedRotationPolicy) {
-                LOG.warn("*** Exactly once semantics is not supported with TimedRotationPolicy ***");
-                LOG.warn("*** Turning off exactly once.");
+                LOG.warn("*** Exactly once semantics is not supported for TimedRotationPolicy ***");
+                LOG.warn("Turning off exactly once.");
                 this.exactlyOnce = false;
             }
             if (this.fsUrl == null) {
@@ -220,8 +228,18 @@ public class HdfsState implements State {
             return this;
         }
 
-        public HdfsFileOptions withBufferSize(int size) {
-            this.bufferSize = Math.max(4096, size); // at least 4K
+        /**
+         * <p>Set the size of the buffer used for hdfs file copy in case of recovery. The default
+         * value is 131072.</p>
+         *
+         * <p> Note: The lower limit for the parameter is 4096, below which the
+         * option is ignored. </p>
+         *
+         * @param sizeInBytes the buffer size in bytes
+         * @return {@link HdfsFileOptions}
+         */
+        public HdfsFileOptions withBufferSize(int sizeInBytes) {
+            this.bufferSize = Math.max(4096, sizeInBytes); // at least 4K
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 5d99108..79b4f75 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -83,4 +83,7 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
         this.lastOffset = 0;
     }
 
+    public long getMaxBytes() {
+        return maxBytes;
+    }
 }


[02/12] storm git commit: Removing wildcard imports

Posted by da...@apache.org.
Removing wildcard imports


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/579dc87f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/579dc87f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/579dc87f

Branch: refs/heads/master
Commit: 579dc87f04cf24fc9419945d54519fce09584178
Parents: 5631b9d
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Jul 22 10:14:24 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Jul 22 10:14:43 2015 +0530

----------------------------------------------------------------------
 .../apache/storm/hdfs/trident/HdfsState.java    | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/579dc87f/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 9b6b48c..9b9ba8e 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -21,7 +21,10 @@ import backtype.storm.Config;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.topology.FailedException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -32,7 +35,6 @@ import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.format.RecordFormat;
 import org.apache.storm.hdfs.trident.format.SequenceFormat;
 import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
-
 import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,9 +42,19 @@ import storm.trident.operation.TridentCollector;
 import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
 import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 public class HdfsState implements State {
 


[04/12] storm git commit: Merge remote-tracking branch 'upstream/master'

Posted by da...@apache.org.
Merge remote-tracking branch 'upstream/master'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85eadd79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85eadd79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85eadd79

Branch: refs/heads/master
Commit: 85eadd790198a506a020fb4d1fd3247741b7db64
Parents: e71e2dd 9b2fd72
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Jul 24 09:09:51 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Fri Jul 24 09:09:51 2015 +0530

----------------------------------------------------------------------
 CHANGELOG.md                                    |  1 +
 .../src/clj/backtype/storm/daemon/task.clj      |  1 -
 storm-core/src/clj/backtype/storm/tuple.clj     |  6 ++-
 .../storm/grouping/PartialKeyGrouping.java      | 27 +++++++++++-
 .../storm/testing/TestWordBytesCounter.java     | 27 ++++++++++++
 .../backtype/storm/testing/TestWordCounter.java |  6 ++-
 .../test/clj/backtype/storm/grouping_test.clj   | 43 +++++++++++++-------
 7 files changed, 91 insertions(+), 20 deletions(-)
----------------------------------------------------------------------



[07/12] storm git commit: Support exactly once always; added unit tests.

Posted by da...@apache.org.
Support exactly once always; added unit tests.

1. Enable exaclty once irrespective of the rotation policy in use.
2. For TimedRotationPolicy set a flag and do the actual rotation in doCommit.
3. Added unit tests and updated README.md as per the new behavior.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cdabbb3d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cdabbb3d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cdabbb3d

Branch: refs/heads/master
Commit: cdabbb3db273895a79939f40894b999167532125
Parents: df63f75
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 10 13:09:53 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 10 13:09:53 2015 +0530

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   |   9 +-
 external/storm-hdfs/pom.xml                     |  11 +
 .../apache/storm/hdfs/trident/HdfsState.java    | 142 +++++--------
 .../trident/rotation/TimedRotationPolicy.java   |  27 ++-
 .../storm/hdfs/trident/HdfsStateTest.java       | 206 +++++++++++++++++++
 5 files changed, 299 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index bb4a618..98fcc55 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -319,12 +319,9 @@ that of the bolts.
 
 ### Note
 Whenever a batch is replayed by storm (due to failures), the trident state implementation automatically removes 
-duplicates from the current data file by copying the data up to the last transaction to another file . Since this 
-operation involves a lot of data copy, the exactly once semantics is enabled only if `FileSizeRotationPolicy` with 
-file size less than 1 GB is specified.
-
-The exactly once semantics is automatically disabled if `FileSizeRotationPolicy` with size greater than 1 GB or
-`TimedRotationPolicy` is in use.
+duplicates from the current data file by copying the data up to the last transaction to another file. Since this 
+operation involves a lot of data copy, ensure that the data files are rotated at reasonable sizes with `FileSizeRotationPolicy` 
+and at reasonable intervals with `TimedRotationPolicy` so that the recovery can complete within topology.message.timeout.secs.
 
 ##Working with Secure HDFS
 If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We 

http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 7e927bf..15e6f72 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -68,6 +68,17 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
       <plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 3dc566c..8b29f66 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -54,8 +54,6 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
 public class HdfsState implements State {
 
@@ -70,12 +68,7 @@ public class HdfsState implements State {
         protected int rotation = 0;
         protected transient Configuration hdfsConfig;
         protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
-        protected transient Object writeLock;
-        protected transient Timer rotationTimer;
-        /**
-         * This is on by default unless TimedRotationPolicy is in use.
-         */
-        private boolean exactlyOnce = true;
+
 
         abstract void closeOutputFile() throws IOException;
 
@@ -91,29 +84,21 @@ public class HdfsState implements State {
 
         abstract void doRecover(Path srcPath, long nBytes) throws Exception;
 
-        protected boolean isExactlyOnce() {
-            return this.exactlyOnce;
-        }
-
         protected void rotateOutputFile(boolean doRotateAction) throws IOException {
             LOG.info("Rotating output file...");
             long start = System.currentTimeMillis();
-            synchronized (this.writeLock) {
-                closeOutputFile();
-                this.rotation++;
-                Path newFile = createOutputFile();
-                if (doRotateAction) {
-                    LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
-                    for (RotationAction action : this.rotationActions) {
-                        action.execute(this.fs, this.currentFile);
-                    }
+            closeOutputFile();
+            this.rotation++;
+            Path newFile = createOutputFile();
+            if (doRotateAction) {
+                LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
+                for (RotationAction action : this.rotationActions) {
+                    action.execute(this.fs, this.currentFile);
                 }
-                this.currentFile = newFile;
             }
+            this.currentFile = newFile;
             long time = System.currentTimeMillis() - start;
             LOG.info("File rotation took {} ms.", time);
-
-
         }
 
         protected void rotateOutputFile() throws IOException {
@@ -122,20 +107,17 @@ public class HdfsState implements State {
 
 
         void prepare(Map conf, int partitionIndex, int numPartitions) {
-            this.writeLock = new Object();
             if (this.rotationPolicy == null) {
                 throw new IllegalStateException("RotationPolicy must be specified.");
             } else if (this.rotationPolicy instanceof FileSizeRotationPolicy) {
-                long limit = FileSizeRotationPolicy.Units.GB.getByteCount();
-                if(((FileSizeRotationPolicy) rotationPolicy).getMaxBytes() > limit) {
-                    LOG.warn("*** Exactly once semantics is not supported for FileSizeRotationPolicy with size > 1 GB ***");
-                    LOG.warn("Turning off exactly once.");
-                    this.exactlyOnce = false;
-                }
+                long rotationBytes = ((FileSizeRotationPolicy) rotationPolicy).getMaxBytes();
+                LOG.warn("FileSizeRotationPolicy specified with {} bytes.", rotationBytes);
+                LOG.warn("Recovery will fail if data files cannot be copied within topology.message.timeout.secs.");
+                LOG.warn("Ensure that the data files does not grow too big with the FileSizeRotationPolicy.");
             } else if (this.rotationPolicy instanceof TimedRotationPolicy) {
-                LOG.warn("*** Exactly once semantics is not supported for TimedRotationPolicy ***");
-                LOG.warn("Turning off exactly once.");
-                this.exactlyOnce = false;
+                LOG.warn("TimedRotationPolicy specified with interval {} ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
+                LOG.warn("Recovery will fail if data files cannot be copied within topology.message.timeout.secs.");
+                LOG.warn("Ensure that the data files does not grow too big with the TimedRotationPolicy.");
             }
             if (this.fsUrl == null) {
                 throw new IllegalStateException("File system URL must be specified.");
@@ -158,19 +140,7 @@ public class HdfsState implements State {
             }
 
             if (this.rotationPolicy instanceof TimedRotationPolicy) {
-                long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
-                this.rotationTimer = new Timer(true);
-                TimerTask task = new TimerTask() {
-                    @Override
-                    public void run() {
-                        try {
-                            rotateOutputFile();
-                        } catch (IOException e) {
-                            LOG.warn("IOException during scheduled file rotation.", e);
-                        }
-                    }
-                };
-                this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+                ((TimedRotationPolicy) this.rotationPolicy).start();
             }
         }
 
@@ -181,13 +151,16 @@ public class HdfsState implements State {
         private void recover(String srcFile, long nBytes) {
             try {
                 Path srcPath = new Path(srcFile);
+                rotateOutputFile(false);
+                this.rotationPolicy.reset();
                 if (nBytes > 0) {
-                    rotateOutputFile(false);
-                    this.rotationPolicy.reset();
                     doRecover(srcPath, nBytes);
                     LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+                } else {
+                    LOG.info("Nothing to recover from {}", srcFile);
                 }
                 fs.delete(srcPath, false);
+                LOG.info("Deleted file {} that had partial commits.", srcFile);
             } catch (Exception e) {
                 LOG.warn("Recovery failed.", e);
                 throw new RuntimeException(e);
@@ -251,7 +224,7 @@ public class HdfsState implements State {
 
         @Override
         void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
-            LOG.info("Preparing HDFS Bolt...");
+            LOG.info("Preparing HDFS File state...");
             this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
         }
 
@@ -262,17 +235,15 @@ public class HdfsState implements State {
 
         @Override
         public void doCommit(Long txId) throws IOException {
-            synchronized (writeLock) {
-                if (this.rotationPolicy.mark(this.offset)) {
-                    rotateOutputFile();
-                    this.offset = 0;
-                    this.rotationPolicy.reset();
+            if (this.rotationPolicy.mark(this.offset)) {
+                rotateOutputFile();
+                this.offset = 0;
+                this.rotationPolicy.reset();
+            } else {
+                if (this.out instanceof HdfsDataOutputStream) {
+                    ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                 } else {
-                    if (this.out instanceof HdfsDataOutputStream) {
-                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-                    } else {
-                        this.out.hsync();
-                    }
+                    this.out.hsync();
                 }
             }
         }
@@ -308,12 +279,10 @@ public class HdfsState implements State {
 
         @Override
         public void execute(List<TridentTuple> tuples) throws IOException {
-            synchronized (this.writeLock) {
-                for (TridentTuple tuple : tuples) {
-                    byte[] bytes = this.format.format(tuple);
-                    out.write(bytes);
-                    this.offset += bytes.length;
-                }
+            for (TridentTuple tuple : tuples) {
+                byte[] bytes = this.format.format(tuple);
+                out.write(bytes);
+                this.offset += bytes.length;
             }
         }
     }
@@ -381,13 +350,11 @@ public class HdfsState implements State {
 
         @Override
         public void doCommit(Long txId) throws IOException {
-            synchronized (writeLock) {
-                if (this.rotationPolicy.mark(this.writer.getLength())) {
-                    rotateOutputFile();
-                    this.rotationPolicy.reset();
-                } else {
-                    this.writer.hsync();
-                }
+            if (this.rotationPolicy.mark(this.writer.getLength())) {
+                rotateOutputFile();
+                this.rotationPolicy.reset();
+            } else {
+                this.writer.hsync();
             }
         }
 
@@ -425,9 +392,7 @@ public class HdfsState implements State {
         @Override
         public void execute(List<TridentTuple> tuples) throws IOException {
             for (TridentTuple tuple : tuples) {
-                synchronized (this.writeLock) {
-                    this.writer.append(this.format.key(tuple), this.format.value(tuple));
-                }
+                this.writer.append(this.format.key(tuple), this.format.value(tuple));
             }
         }
 
@@ -468,9 +433,7 @@ public class HdfsState implements State {
 
     void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         this.options.prepare(conf, partitionIndex, numPartitions);
-        if (options.isExactlyOnce()) {
-            initLastTxn(conf, partitionIndex);
-        }
+        initLastTxn(conf, partitionIndex);
     }
 
     private TxnRecord readTxnRecord(Path path) throws IOException {
@@ -558,15 +521,13 @@ public class HdfsState implements State {
 
     @Override
     public void beginCommit(Long txId) {
-        if (options.isExactlyOnce()) {
-            if (txId <= lastSeenTxn.txnid) {
-                LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
-                long start = System.currentTimeMillis();
-                options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
-                LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
-            }
-            updateIndex(txId);
+        if (txId <= lastSeenTxn.txnid) {
+            LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
+            long start = System.currentTimeMillis();
+            options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
+            LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
         }
+        updateIndex(txId);
     }
 
     @Override
@@ -587,4 +548,11 @@ public class HdfsState implements State {
             throw new FailedException(e);
         }
     }
+
+    /**
+     * for unit tests
+     */
+    void close() throws IOException {
+        this.options.closeOutputFile();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index f407d1d..74e7fab 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -19,6 +19,10 @@ package org.apache.storm.hdfs.trident.rotation;
 
 import storm.trident.tuple.TridentTuple;
 
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 
 public class TimedRotationPolicy implements FileRotationPolicy {
 
@@ -41,6 +45,9 @@ public class TimedRotationPolicy implements FileRotationPolicy {
     }
 
     private long interval;
+    private Timer rotationTimer;
+    private AtomicBoolean rotationTimerTriggered = new AtomicBoolean();
+
 
     public TimedRotationPolicy(float count, TimeUnit units){
         this.interval = (long)(count * units.getMilliSeconds());
@@ -54,12 +61,12 @@ public class TimedRotationPolicy implements FileRotationPolicy {
      */
     @Override
     public boolean mark(TridentTuple tuple, long offset) {
-        return false;
+        return rotationTimerTriggered.get();
     }
 
     @Override
     public boolean mark(long offset) {
-        return false;
+        return rotationTimerTriggered.get();
     }
 
     /**
@@ -67,10 +74,24 @@ public class TimedRotationPolicy implements FileRotationPolicy {
      */
     @Override
     public void reset() {
-
+        rotationTimerTriggered.set(false);
     }
 
     public long getInterval(){
         return this.interval;
     }
+
+    /**
+     * Start the timer to run at fixed intervals.
+     */
+    public void start() {
+        rotationTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+            @Override
+            public void run() {
+                rotationTimerTriggered.set(true);
+            }
+        };
+        rotationTimer.scheduleAtFixedRate(task, interval, interval);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
new file mode 100644
index 0000000..e1f45df
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
@@ -0,0 +1,206 @@
+package org.apache.storm.hdfs.trident;
+
+import backtype.storm.Config;
+import backtype.storm.tuple.Fields;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class HdfsStateTest {
+
+    private static final String TEST_OUT_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "trident-unit-test").toString();
+
+    private static final String FILE_NAME_PREFIX = "hdfs-data-";
+    private static final String TEST_TOPOLOGY_NAME = "test-topology";
+    private static final String INDEX_FILE_PREFIX = ".index.";
+    private final TestFileNameFormat fileNameFormat = new TestFileNameFormat();
+
+    private static class TestFileNameFormat implements FileNameFormat {
+        private String currentFileName = "";
+
+        @Override
+        public void prepare(Map conf, int partitionIndex, int numPartitions) {
+
+        }
+
+        @Override
+        public String getName(long rotation, long timeStamp) {
+            currentFileName = FILE_NAME_PREFIX + Long.toString(rotation);
+            return currentFileName;
+        }
+
+        @Override
+        public String getPath() {
+            return TEST_OUT_DIR;
+        }
+
+        public String getCurrentFileName() {
+            return currentFileName;
+        }
+    }
+
+    private HdfsState createHdfsState() {
+
+        Fields hdfsFields = new Fields("f1");
+
+        RecordFormat recordFormat = new DelimitedRecordFormat().withFields(hdfsFields);
+
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+        HdfsState.Options options = new HdfsState.HdfsFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withRecordFormat(recordFormat)
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("file://" + TEST_OUT_DIR);
+
+        Map<String, String> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_NAME, TEST_TOPOLOGY_NAME);
+
+        HdfsState state = new HdfsState(options);
+        state.prepare(conf, null, 0, 1);
+        return state;
+    }
+
+    private List<TridentTuple> createMockTridentTuples(int count) {
+        TridentTuple tuple = mock(TridentTuple.class);
+        when(tuple.getValueByField(any(String.class))).thenReturn("data");
+        List<TridentTuple> tuples = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            tuples.add(tuple);
+        }
+        return tuples;
+    }
+
+    private List<String> getLinesFromCurrentDataFile() throws IOException {
+        Path dataFile = Paths.get(TEST_OUT_DIR, fileNameFormat.getCurrentFileName());
+        List<String> lines = Files.readAllLines(dataFile, Charset.defaultCharset());
+        return lines;
+    }
+
+    @Before
+    public void setUp() {
+        FileUtils.deleteQuietly(new File(TEST_OUT_DIR));
+    }
+
+
+    @Test
+    public void testPrepare() throws Exception {
+        HdfsState state = createHdfsState();
+        Collection<File> files = FileUtils.listFiles(new File(TEST_OUT_DIR), null, false);
+        File hdfsDataFile = Paths.get(TEST_OUT_DIR, FILE_NAME_PREFIX + "0").toFile();
+        Assert.assertTrue(files.contains(hdfsDataFile));
+    }
+
+    @Test
+    public void testIndexFileCreation() throws Exception {
+        HdfsState state = createHdfsState();
+        state.beginCommit(1L);
+        Collection<File> files = FileUtils.listFiles(new File(TEST_OUT_DIR), null, false);
+        File hdfsIndexFile = Paths.get(TEST_OUT_DIR, INDEX_FILE_PREFIX + TEST_TOPOLOGY_NAME + ".0").toFile();
+        Assert.assertTrue(files.contains(hdfsIndexFile));
+    }
+
+    @Test
+    public void testUpdateState() throws Exception {
+        HdfsState state = createHdfsState();
+        state.beginCommit(1L);
+        int tupleCount = 100;
+        state.updateState(createMockTridentTuples(tupleCount), null);
+        state.commit(1L);
+        state.close();
+        List<String> lines = getLinesFromCurrentDataFile();
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < tupleCount; i++) {
+            expected.add("data");
+        }
+        Assert.assertEquals(tupleCount, lines.size());
+        Assert.assertEquals(expected, lines);
+    }
+
+    @Test
+    public void testRecoverOneBatch() throws Exception {
+        HdfsState state = createHdfsState();
+        // batch 1 is played with 25 tuples initially.
+        state.beginCommit(1L);
+        state.updateState(createMockTridentTuples(25), null);
+        // batch 1 is replayed with 50 tuples.
+        int replayBatchSize = 50;
+        state.beginCommit(1L);
+        state.updateState(createMockTridentTuples(replayBatchSize), null);
+        state.commit(1L);
+        // close the state to force flush
+        state.close();
+        // Ensure that the original batch1 is discarded and new one is persisted.
+        List<String> lines = getLinesFromCurrentDataFile();
+        Assert.assertEquals(replayBatchSize, lines.size());
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < replayBatchSize; i++) {
+            expected.add("data");
+        }
+        Assert.assertEquals(expected, lines);
+    }
+
+    @Test
+    public void testRecoverMultipleBatches() throws Exception {
+        HdfsState state = createHdfsState();
+
+        // batch 1
+        int batch1Count = 10;
+        state.beginCommit(1L);
+        state.updateState(createMockTridentTuples(batch1Count), null);
+        state.commit(1L);
+
+        // batch 2
+        int batch2Count = 20;
+        state.beginCommit(2L);
+        state.updateState(createMockTridentTuples(batch2Count), null);
+        state.commit(2L);
+
+        // batch 3
+        int batch3Count = 30;
+        state.beginCommit(3L);
+        state.updateState(createMockTridentTuples(batch3Count), null);
+        state.commit(3L);
+
+        // batch 3 replayed with 40 tuples
+        int batch3ReplayCount = 40;
+        state.beginCommit(3L);
+        state.updateState(createMockTridentTuples(batch3ReplayCount), null);
+        state.commit(3L);
+        state.close();
+        /*
+         * total tuples should be
+         * recovered (batch-1 + batch-2) + replayed (batch-3)
+        */
+        List<String> lines = getLinesFromCurrentDataFile();
+        int preReplayCount = batch1Count + batch2Count + batch3Count;
+        int expectedTupleCount = batch1Count + batch2Count + batch3ReplayCount;
+
+        Assert.assertNotEquals(preReplayCount, lines.size());
+        Assert.assertEquals(expectedTupleCount, lines.size());
+    }
+}
\ No newline at end of file


[06/12] storm git commit: Merge remote-tracking branch 'upstream/master'

Posted by da...@apache.org.
Merge remote-tracking branch 'upstream/master'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/df63f75f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/df63f75f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/df63f75f

Branch: refs/heads/master
Commit: df63f75f92fe1cdc9368aedc06be1ea91034fd6c
Parents: 8505c3e 6de597a
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 3 12:06:44 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 3 12:06:44 2015 +0530

----------------------------------------------------------------------
 CHANGELOG.md                                    |   3 +
 DEVELOPER.md                                    |   7 +-
 docs/documentation/Documentation.md             |   4 +-
 external/storm-hive/README.md                   |   1 +
 .../org/apache/storm/hive/bolt/HiveBolt.java    |  44 +++++---
 .../apache/storm/hive/common/HiveWriter.java    |   5 +-
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 100 ++++++++++++++++---
 external/storm-redis/README.md                  |   1 +
 log4j2/cluster.xml                              |   6 +-
 log4j2/worker.xml                               |   6 +-
 10 files changed, 135 insertions(+), 42 deletions(-)
----------------------------------------------------------------------



[11/12] storm git commit: Updates for STORM-837

Posted by da...@apache.org.
Updates for STORM-837


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a3fa9b19
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a3fa9b19
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a3fa9b19

Branch: refs/heads/master
Commit: a3fa9b195d1cb3a750fa1506efe017ba393ccff3
Parents: 430375c
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri Aug 14 12:22:17 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri Aug 14 12:22:17 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md    | 1 +
 README.markdown | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a3fa9b19/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e80ea09..16cf2c0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-837: HdfsState ignores commits
  * STORM-938: storm-hive add a time interval to flush tuples to hive.
  * STORM-977: Incorrect signal (-9) when as-user is true
  * STORM-964: Add config (with small default value) for logwriter to restrict its memory usage

http://git-wip-us.apache.org/repos/asf/storm/blob/a3fa9b19/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index a76673f..e965605 100644
--- a/README.markdown
+++ b/README.markdown
@@ -213,6 +213,7 @@ under the License.
 * Adrian Seungjin Lee ([@sweetest](https://github.com/sweetest))
 * Randy Gelhausen ([@randerzander](https://github.com/randerzander))
 * Gabor Liptak ([@gliptak](https://github.com/glibtak))
+* Arun Mahadevan ([@arunmahadevan](https://github.com/arunmahadevan))
 
 ## Acknowledgements
 


[09/12] storm git commit: Refactoring code based on feedback

Posted by da...@apache.org.
Refactoring code based on feedback


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c44e02b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c44e02b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c44e02b3

Branch: refs/heads/master
Commit: c44e02b3818c572924d96ae1dc26bb6ab6df66c7
Parents: 4d8c51b
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Aug 14 11:58:54 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Fri Aug 14 11:58:54 2015 +0530

----------------------------------------------------------------------
 .../java/org/apache/storm/hdfs/trident/HdfsState.java | 14 +++++++++-----
 .../hdfs/trident/rotation/FileRotationPolicy.java     |  6 ++++++
 .../hdfs/trident/rotation/FileSizeRotationPolicy.java |  5 +++++
 .../storm/hdfs/trident/rotation/NoRotationPolicy.java |  5 +++++
 .../hdfs/trident/rotation/TimedRotationPolicy.java    |  1 +
 5 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 8b29f66..4448868 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -139,9 +139,7 @@ public class HdfsState implements State {
                 throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
             }
 
-            if (this.rotationPolicy instanceof TimedRotationPolicy) {
-                ((TimedRotationPolicy) this.rotationPolicy).start();
-            }
+            rotationPolicy.start();
         }
 
         /**
@@ -455,6 +453,12 @@ public class HdfsState implements State {
     }
 
     /**
+     * Returns temp file path corresponding to a file name.
+     */
+    private Path tmpFilePath(String filename) {
+        return new Path(filename + ".tmp");
+    }
+    /**
      * Reads the last txn record from index file if it exists, if not
      * from .tmp file if exists.
      *
@@ -463,7 +467,7 @@ public class HdfsState implements State {
      * @throws IOException
      */
     private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
-        Path tmpPath = new Path(indexFilePath.toString() + ".tmp");
+        Path tmpPath = tmpFilePath(indexFilePath.toString());
         if (this.options.fs.exists(indexFilePath)) {
             return readTxnRecord(indexFilePath);
         } else if (this.options.fs.exists(tmpPath)) {
@@ -489,7 +493,7 @@ public class HdfsState implements State {
         FSDataOutputStream out = null;
         LOG.debug("Starting index update.");
         try {
-            Path tmpPath = new Path(this.indexFilePath.toString() + ".tmp");
+            Path tmpPath = tmpFilePath(indexFilePath.toString());
             out = this.options.fs.create(tmpPath, true);
             BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
             TxnRecord txnRecord = new TxnRecord(txId, options.currentFile.toString(), this.options.getCurrentOffset());

http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
index 89fd918..f429221 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
@@ -55,4 +55,10 @@ public interface FileRotationPolicy extends Serializable {
      *
      */
     void reset();
+
+    /**
+     * Start the policy. Useful in case of policies like timed rotation
+     * where the timer can be started.
+     */
+    void start();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 79b4f75..fad6455 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -83,6 +83,11 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
         this.lastOffset = 0;
     }
 
+    @Override
+    public void start() {
+
+    }
+
     public long getMaxBytes() {
         return maxBytes;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
index caabee5..8117f95 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
@@ -37,4 +37,9 @@ public class NoRotationPolicy implements FileRotationPolicy {
     @Override
     public void reset() {
     }
+
+    @Override
+    public void start() {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index 74e7fab..f8cfe44 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -84,6 +84,7 @@ public class TimedRotationPolicy implements FileRotationPolicy {
     /**
      * Start the timer to run at fixed intervals.
      */
+    @Override
     public void start() {
         rotationTimer = new Timer(true);
         TimerTask task = new TimerTask() {