You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/08/14 20:01:23 UTC

[07/12] storm git commit: Support exactly once always; added unit tests.

Support exactly once always; added unit tests.

1. Enable exaclty once irrespective of the rotation policy in use.
2. For TimedRotationPolicy set a flag and do the actual rotation in doCommit.
3. Added unit tests and updated README.md as per the new behavior.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cdabbb3d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cdabbb3d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cdabbb3d

Branch: refs/heads/master
Commit: cdabbb3db273895a79939f40894b999167532125
Parents: df63f75
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 10 13:09:53 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 10 13:09:53 2015 +0530

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   |   9 +-
 external/storm-hdfs/pom.xml                     |  11 +
 .../apache/storm/hdfs/trident/HdfsState.java    | 142 +++++--------
 .../trident/rotation/TimedRotationPolicy.java   |  27 ++-
 .../storm/hdfs/trident/HdfsStateTest.java       | 206 +++++++++++++++++++
 5 files changed, 299 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index bb4a618..98fcc55 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -319,12 +319,9 @@ that of the bolts.
 
 ### Note
 Whenever a batch is replayed by storm (due to failures), the trident state implementation automatically removes 
-duplicates from the current data file by copying the data up to the last transaction to another file . Since this 
-operation involves a lot of data copy, the exactly once semantics is enabled only if `FileSizeRotationPolicy` with 
-file size less than 1 GB is specified.
-
-The exactly once semantics is automatically disabled if `FileSizeRotationPolicy` with size greater than 1 GB or
-`TimedRotationPolicy` is in use.
+duplicates from the current data file by copying the data up to the last transaction to another file. Since this 
+operation involves a lot of data copy, ensure that the data files are rotated at reasonable sizes with `FileSizeRotationPolicy` 
+and at reasonable intervals with `TimedRotationPolicy` so that the recovery can complete within topology.message.timeout.secs.
 
 ##Working with Secure HDFS
 If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We 

http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 7e927bf..15e6f72 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -68,6 +68,17 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
       <plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 3dc566c..8b29f66 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -54,8 +54,6 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
 public class HdfsState implements State {
 
@@ -70,12 +68,7 @@ public class HdfsState implements State {
         protected int rotation = 0;
         protected transient Configuration hdfsConfig;
         protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
-        protected transient Object writeLock;
-        protected transient Timer rotationTimer;
-        /**
-         * This is on by default unless TimedRotationPolicy is in use.
-         */
-        private boolean exactlyOnce = true;
+
 
         abstract void closeOutputFile() throws IOException;
 
@@ -91,29 +84,21 @@ public class HdfsState implements State {
 
         abstract void doRecover(Path srcPath, long nBytes) throws Exception;
 
-        protected boolean isExactlyOnce() {
-            return this.exactlyOnce;
-        }
-
         protected void rotateOutputFile(boolean doRotateAction) throws IOException {
             LOG.info("Rotating output file...");
             long start = System.currentTimeMillis();
-            synchronized (this.writeLock) {
-                closeOutputFile();
-                this.rotation++;
-                Path newFile = createOutputFile();
-                if (doRotateAction) {
-                    LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
-                    for (RotationAction action : this.rotationActions) {
-                        action.execute(this.fs, this.currentFile);
-                    }
+            closeOutputFile();
+            this.rotation++;
+            Path newFile = createOutputFile();
+            if (doRotateAction) {
+                LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
+                for (RotationAction action : this.rotationActions) {
+                    action.execute(this.fs, this.currentFile);
                 }
-                this.currentFile = newFile;
             }
+            this.currentFile = newFile;
             long time = System.currentTimeMillis() - start;
             LOG.info("File rotation took {} ms.", time);
-
-
         }
 
         protected void rotateOutputFile() throws IOException {
@@ -122,20 +107,17 @@ public class HdfsState implements State {
 
 
         void prepare(Map conf, int partitionIndex, int numPartitions) {
-            this.writeLock = new Object();
             if (this.rotationPolicy == null) {
                 throw new IllegalStateException("RotationPolicy must be specified.");
             } else if (this.rotationPolicy instanceof FileSizeRotationPolicy) {
-                long limit = FileSizeRotationPolicy.Units.GB.getByteCount();
-                if(((FileSizeRotationPolicy) rotationPolicy).getMaxBytes() > limit) {
-                    LOG.warn("*** Exactly once semantics is not supported for FileSizeRotationPolicy with size > 1 GB ***");
-                    LOG.warn("Turning off exactly once.");
-                    this.exactlyOnce = false;
-                }
+                long rotationBytes = ((FileSizeRotationPolicy) rotationPolicy).getMaxBytes();
+                LOG.warn("FileSizeRotationPolicy specified with {} bytes.", rotationBytes);
+                LOG.warn("Recovery will fail if data files cannot be copied within topology.message.timeout.secs.");
+                LOG.warn("Ensure that the data files does not grow too big with the FileSizeRotationPolicy.");
             } else if (this.rotationPolicy instanceof TimedRotationPolicy) {
-                LOG.warn("*** Exactly once semantics is not supported for TimedRotationPolicy ***");
-                LOG.warn("Turning off exactly once.");
-                this.exactlyOnce = false;
+                LOG.warn("TimedRotationPolicy specified with interval {} ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
+                LOG.warn("Recovery will fail if data files cannot be copied within topology.message.timeout.secs.");
+                LOG.warn("Ensure that the data files does not grow too big with the TimedRotationPolicy.");
             }
             if (this.fsUrl == null) {
                 throw new IllegalStateException("File system URL must be specified.");
@@ -158,19 +140,7 @@ public class HdfsState implements State {
             }
 
             if (this.rotationPolicy instanceof TimedRotationPolicy) {
-                long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
-                this.rotationTimer = new Timer(true);
-                TimerTask task = new TimerTask() {
-                    @Override
-                    public void run() {
-                        try {
-                            rotateOutputFile();
-                        } catch (IOException e) {
-                            LOG.warn("IOException during scheduled file rotation.", e);
-                        }
-                    }
-                };
-                this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+                ((TimedRotationPolicy) this.rotationPolicy).start();
             }
         }
 
@@ -181,13 +151,16 @@ public class HdfsState implements State {
         private void recover(String srcFile, long nBytes) {
             try {
                 Path srcPath = new Path(srcFile);
+                rotateOutputFile(false);
+                this.rotationPolicy.reset();
                 if (nBytes > 0) {
-                    rotateOutputFile(false);
-                    this.rotationPolicy.reset();
                     doRecover(srcPath, nBytes);
                     LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+                } else {
+                    LOG.info("Nothing to recover from {}", srcFile);
                 }
                 fs.delete(srcPath, false);
+                LOG.info("Deleted file {} that had partial commits.", srcFile);
             } catch (Exception e) {
                 LOG.warn("Recovery failed.", e);
                 throw new RuntimeException(e);
@@ -251,7 +224,7 @@ public class HdfsState implements State {
 
         @Override
         void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
-            LOG.info("Preparing HDFS Bolt...");
+            LOG.info("Preparing HDFS File state...");
             this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
         }
 
@@ -262,17 +235,15 @@ public class HdfsState implements State {
 
         @Override
         public void doCommit(Long txId) throws IOException {
-            synchronized (writeLock) {
-                if (this.rotationPolicy.mark(this.offset)) {
-                    rotateOutputFile();
-                    this.offset = 0;
-                    this.rotationPolicy.reset();
+            if (this.rotationPolicy.mark(this.offset)) {
+                rotateOutputFile();
+                this.offset = 0;
+                this.rotationPolicy.reset();
+            } else {
+                if (this.out instanceof HdfsDataOutputStream) {
+                    ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                 } else {
-                    if (this.out instanceof HdfsDataOutputStream) {
-                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-                    } else {
-                        this.out.hsync();
-                    }
+                    this.out.hsync();
                 }
             }
         }
@@ -308,12 +279,10 @@ public class HdfsState implements State {
 
         @Override
         public void execute(List<TridentTuple> tuples) throws IOException {
-            synchronized (this.writeLock) {
-                for (TridentTuple tuple : tuples) {
-                    byte[] bytes = this.format.format(tuple);
-                    out.write(bytes);
-                    this.offset += bytes.length;
-                }
+            for (TridentTuple tuple : tuples) {
+                byte[] bytes = this.format.format(tuple);
+                out.write(bytes);
+                this.offset += bytes.length;
             }
         }
     }
@@ -381,13 +350,11 @@ public class HdfsState implements State {
 
         @Override
         public void doCommit(Long txId) throws IOException {
-            synchronized (writeLock) {
-                if (this.rotationPolicy.mark(this.writer.getLength())) {
-                    rotateOutputFile();
-                    this.rotationPolicy.reset();
-                } else {
-                    this.writer.hsync();
-                }
+            if (this.rotationPolicy.mark(this.writer.getLength())) {
+                rotateOutputFile();
+                this.rotationPolicy.reset();
+            } else {
+                this.writer.hsync();
             }
         }
 
@@ -425,9 +392,7 @@ public class HdfsState implements State {
         @Override
         public void execute(List<TridentTuple> tuples) throws IOException {
             for (TridentTuple tuple : tuples) {
-                synchronized (this.writeLock) {
-                    this.writer.append(this.format.key(tuple), this.format.value(tuple));
-                }
+                this.writer.append(this.format.key(tuple), this.format.value(tuple));
             }
         }
 
@@ -468,9 +433,7 @@ public class HdfsState implements State {
 
     void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         this.options.prepare(conf, partitionIndex, numPartitions);
-        if (options.isExactlyOnce()) {
-            initLastTxn(conf, partitionIndex);
-        }
+        initLastTxn(conf, partitionIndex);
     }
 
     private TxnRecord readTxnRecord(Path path) throws IOException {
@@ -558,15 +521,13 @@ public class HdfsState implements State {
 
     @Override
     public void beginCommit(Long txId) {
-        if (options.isExactlyOnce()) {
-            if (txId <= lastSeenTxn.txnid) {
-                LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
-                long start = System.currentTimeMillis();
-                options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
-                LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
-            }
-            updateIndex(txId);
+        if (txId <= lastSeenTxn.txnid) {
+            LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
+            long start = System.currentTimeMillis();
+            options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
+            LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
         }
+        updateIndex(txId);
     }
 
     @Override
@@ -587,4 +548,11 @@ public class HdfsState implements State {
             throw new FailedException(e);
         }
     }
+
+    /**
+     * for unit tests
+     */
+    void close() throws IOException {
+        this.options.closeOutputFile();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index f407d1d..74e7fab 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -19,6 +19,10 @@ package org.apache.storm.hdfs.trident.rotation;
 
 import storm.trident.tuple.TridentTuple;
 
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 
 public class TimedRotationPolicy implements FileRotationPolicy {
 
@@ -41,6 +45,9 @@ public class TimedRotationPolicy implements FileRotationPolicy {
     }
 
     private long interval;
+    private Timer rotationTimer;
+    private AtomicBoolean rotationTimerTriggered = new AtomicBoolean();
+
 
     public TimedRotationPolicy(float count, TimeUnit units){
         this.interval = (long)(count * units.getMilliSeconds());
@@ -54,12 +61,12 @@ public class TimedRotationPolicy implements FileRotationPolicy {
      */
     @Override
     public boolean mark(TridentTuple tuple, long offset) {
-        return false;
+        return rotationTimerTriggered.get();
     }
 
     @Override
     public boolean mark(long offset) {
-        return false;
+        return rotationTimerTriggered.get();
     }
 
     /**
@@ -67,10 +74,24 @@ public class TimedRotationPolicy implements FileRotationPolicy {
      */
     @Override
     public void reset() {
-
+        rotationTimerTriggered.set(false);
     }
 
     public long getInterval(){
         return this.interval;
     }
+
+    /**
+     * Start the timer to run at fixed intervals.
+     */
+    public void start() {
+        rotationTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+            @Override
+            public void run() {
+                rotationTimerTriggered.set(true);
+            }
+        };
+        rotationTimer.scheduleAtFixedRate(task, interval, interval);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
new file mode 100644
index 0000000..e1f45df
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
@@ -0,0 +1,206 @@
+package org.apache.storm.hdfs.trident;
+
+import backtype.storm.Config;
+import backtype.storm.tuple.Fields;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class HdfsStateTest {
+
+    private static final String TEST_OUT_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "trident-unit-test").toString();
+
+    private static final String FILE_NAME_PREFIX = "hdfs-data-";
+    private static final String TEST_TOPOLOGY_NAME = "test-topology";
+    private static final String INDEX_FILE_PREFIX = ".index.";
+    private final TestFileNameFormat fileNameFormat = new TestFileNameFormat();
+
+    private static class TestFileNameFormat implements FileNameFormat {
+        private String currentFileName = "";
+
+        @Override
+        public void prepare(Map conf, int partitionIndex, int numPartitions) {
+
+        }
+
+        @Override
+        public String getName(long rotation, long timeStamp) {
+            currentFileName = FILE_NAME_PREFIX + Long.toString(rotation);
+            return currentFileName;
+        }
+
+        @Override
+        public String getPath() {
+            return TEST_OUT_DIR;
+        }
+
+        public String getCurrentFileName() {
+            return currentFileName;
+        }
+    }
+
+    private HdfsState createHdfsState() {
+
+        Fields hdfsFields = new Fields("f1");
+
+        RecordFormat recordFormat = new DelimitedRecordFormat().withFields(hdfsFields);
+
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+        HdfsState.Options options = new HdfsState.HdfsFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withRecordFormat(recordFormat)
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("file://" + TEST_OUT_DIR);
+
+        Map<String, String> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_NAME, TEST_TOPOLOGY_NAME);
+
+        HdfsState state = new HdfsState(options);
+        state.prepare(conf, null, 0, 1);
+        return state;
+    }
+
+    private List<TridentTuple> createMockTridentTuples(int count) {
+        TridentTuple tuple = mock(TridentTuple.class);
+        when(tuple.getValueByField(any(String.class))).thenReturn("data");
+        List<TridentTuple> tuples = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            tuples.add(tuple);
+        }
+        return tuples;
+    }
+
+    private List<String> getLinesFromCurrentDataFile() throws IOException {
+        Path dataFile = Paths.get(TEST_OUT_DIR, fileNameFormat.getCurrentFileName());
+        List<String> lines = Files.readAllLines(dataFile, Charset.defaultCharset());
+        return lines;
+    }
+
+    @Before
+    public void setUp() {
+        FileUtils.deleteQuietly(new File(TEST_OUT_DIR));
+    }
+
+
+    @Test
+    public void testPrepare() throws Exception {
+        HdfsState state = createHdfsState();
+        Collection<File> files = FileUtils.listFiles(new File(TEST_OUT_DIR), null, false);
+        File hdfsDataFile = Paths.get(TEST_OUT_DIR, FILE_NAME_PREFIX + "0").toFile();
+        Assert.assertTrue(files.contains(hdfsDataFile));
+    }
+
+    @Test
+    public void testIndexFileCreation() throws Exception {
+        HdfsState state = createHdfsState();
+        state.beginCommit(1L);
+        Collection<File> files = FileUtils.listFiles(new File(TEST_OUT_DIR), null, false);
+        File hdfsIndexFile = Paths.get(TEST_OUT_DIR, INDEX_FILE_PREFIX + TEST_TOPOLOGY_NAME + ".0").toFile();
+        Assert.assertTrue(files.contains(hdfsIndexFile));
+    }
+
+    @Test
+    public void testUpdateState() throws Exception {
+        HdfsState state = createHdfsState();
+        state.beginCommit(1L);
+        int tupleCount = 100;
+        state.updateState(createMockTridentTuples(tupleCount), null);
+        state.commit(1L);
+        state.close();
+        List<String> lines = getLinesFromCurrentDataFile();
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < tupleCount; i++) {
+            expected.add("data");
+        }
+        Assert.assertEquals(tupleCount, lines.size());
+        Assert.assertEquals(expected, lines);
+    }
+
+    @Test
+    public void testRecoverOneBatch() throws Exception {
+        HdfsState state = createHdfsState();
+        // batch 1 is played with 25 tuples initially.
+        state.beginCommit(1L);
+        state.updateState(createMockTridentTuples(25), null);
+        // batch 1 is replayed with 50 tuples.
+        int replayBatchSize = 50;
+        state.beginCommit(1L);
+        state.updateState(createMockTridentTuples(replayBatchSize), null);
+        state.commit(1L);
+        // close the state to force flush
+        state.close();
+        // Ensure that the original batch1 is discarded and new one is persisted.
+        List<String> lines = getLinesFromCurrentDataFile();
+        Assert.assertEquals(replayBatchSize, lines.size());
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < replayBatchSize; i++) {
+            expected.add("data");
+        }
+        Assert.assertEquals(expected, lines);
+    }
+
+    @Test
+    public void testRecoverMultipleBatches() throws Exception {
+        HdfsState state = createHdfsState();
+
+        // batch 1
+        int batch1Count = 10;
+        state.beginCommit(1L);
+        state.updateState(createMockTridentTuples(batch1Count), null);
+        state.commit(1L);
+
+        // batch 2
+        int batch2Count = 20;
+        state.beginCommit(2L);
+        state.updateState(createMockTridentTuples(batch2Count), null);
+        state.commit(2L);
+
+        // batch 3
+        int batch3Count = 30;
+        state.beginCommit(3L);
+        state.updateState(createMockTridentTuples(batch3Count), null);
+        state.commit(3L);
+
+        // batch 3 replayed with 40 tuples
+        int batch3ReplayCount = 40;
+        state.beginCommit(3L);
+        state.updateState(createMockTridentTuples(batch3ReplayCount), null);
+        state.commit(3L);
+        state.close();
+        /*
+         * total tuples should be
+         * recovered (batch-1 + batch-2) + replayed (batch-3)
+        */
+        List<String> lines = getLinesFromCurrentDataFile();
+        int preReplayCount = batch1Count + batch2Count + batch3Count;
+        int expectedTupleCount = batch1Count + batch2Count + batch3ReplayCount;
+
+        Assert.assertNotEquals(preReplayCount, lines.size());
+        Assert.assertEquals(expectedTupleCount, lines.size());
+    }
+}
\ No newline at end of file