You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/20 14:33:51 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2017 Fixed failing tests.

Repository: apex-malhar
Updated Branches:
  refs/heads/master 2cf8bade8 -> 0f8442472


APEXMALHAR-2017 Fixed failing tests.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0f844247
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0f844247
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0f844247

Branch: refs/heads/master
Commit: 0f84424720268e1eebb06992bcda00d2e7d591d4
Parents: 28a7e34
Author: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Authored: Mon Oct 3 09:01:30 2016 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Wed Oct 19 22:28:34 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/contrib/hive/HiveMockTest.java  | 16 ++++++++++++-
 .../lib/io/fs/AbstractFileInputOperator.java    |  3 +--
 .../lib/io/fs/AbstractFileOutputOperator.java   |  6 +++--
 .../lib/io/fs/AbstractReconciler.java           |  2 +-
 .../io/fs/AbstractFileOutputOperatorTest.java   | 25 ++++++++++----------
 .../AbstractWindowFileOutputOperatorTest.java   |  5 ++--
 .../lib/io/fs/FSInputModuleAppTest.java         |  1 +
 .../lib/fs/GenericFileOutputOperatorTest.java   |  6 +++++
 8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
index 3c64bdf..4ec92c9 100755
--- a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
+++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
@@ -50,7 +50,6 @@ import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitio
 import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 
-
 public class HiveMockTest extends HiveTestService
 {
   public static final String APP_ID = "HiveOperatorTest";
@@ -250,6 +249,11 @@ public class HiveMockTest extends HiveTestService
       }
 
       fsRolling.endWindow();
+
+      if (wid == 6) {
+        fsRolling.beforeCheckpoint(wid);
+        fsRolling.checkpointed(wid);
+      }
     }
 
     fsRolling.teardown();
@@ -353,6 +357,11 @@ public class HiveMockTest extends HiveTestService
       }
 
       fsRolling.endWindow();
+
+      if (wid == 6) {
+        fsRolling.beforeCheckpoint(wid);
+        fsRolling.checkpointed(wid);
+      }
     }
 
     fsRolling.teardown();
@@ -521,6 +530,11 @@ public class HiveMockTest extends HiveTestService
 
       fsRolling.endWindow();
 
+      if ((wid == 6) || (wid == 9)) {
+        fsRolling.beforeCheckpoint(wid);
+        fsRolling.checkpointed(wid);
+      }
+
       if (wid == 9) {
         Kryo kryo = new Kryo();
         FieldSerializer<HiveOperator> f1 = (FieldSerializer<HiveOperator>)kryo.getSerializer(HiveOperator.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 14cabfc..d4ee03b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -37,7 +37,6 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import javax.validation.OverridesAttribute;
 import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
@@ -99,7 +98,7 @@ import com.datatorrent.lib.util.KryoCloneUtils;
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener,
-  Operator.CheckpointListener, Operator.CheckpointNotificationListener
+    Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index f703a19..27a56cd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -262,7 +262,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   private Long expireStreamAfterAccessMillis;
   private final Set<String> filesWithOpenStreams;
 
-  private boolean initializeContext;
+  private transient boolean initializeContext;
 
   /**
    * This input port receives incoming tuples.
@@ -968,6 +968,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   @Override
   public void beginWindow(long windowId)
   {
+    // All the filter state needs to be flushed to the disk. Not all filters allow a flush option, so the filters have
+    // to be closed and reopened. If no filter being is being used then it is a essentially a noop as the underlying
+    // FSDataOutputStream is not being closed in this operation.
     if (initializeContext) {
       try {
         Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
@@ -1240,7 +1243,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
         long start = System.currentTimeMillis();
         streamContext.finalizeContext();
         totalWritingTime += System.currentTimeMillis() - start;
-        //streamContext.resetFilter();
         // Re-initialize context when next window starts after checkpoint
         initializeContext = true;
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index c12becd..0d67c31 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator.IdleTimeHandler;
 import com.datatorrent.api.Operator.CheckpointNotificationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.NameableThreadFactory;
 import com.datatorrent.netlet.util.DTThrowable;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 5b58121..8f0fbb0 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -66,6 +66,7 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.RandomWordGenerator;
 import com.datatorrent.lib.util.TestUtils;
@@ -245,7 +246,7 @@ public class AbstractFileOutputOperatorTest
    */
   public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId)
   {
-    if (windowId >= 0) {
+    if (windowId >= Stateless.WINDOW_ID) {
       writer.beforeCheckpoint(windowId);
     }
     Kryo kryo = new Kryo();
@@ -421,7 +422,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -677,7 +678,7 @@ public class AbstractFileOutputOperatorTest
     writer.requestFinalize(EVEN_FILE);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(4);
@@ -821,7 +822,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(2);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(3);
@@ -868,8 +869,8 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(4);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
-    AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
+    AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, Stateless.WINDOW_ID);
 
     LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
 
@@ -1134,7 +1135,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1231,7 +1232,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1278,7 +1279,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.process(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.process(2);
@@ -1369,7 +1370,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(3);
     writer.input.put(4);
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
 
     writer.input.put(3);
     writer.input.put(4);
@@ -1766,13 +1767,11 @@ public class AbstractFileOutputOperatorTest
       for (int j = 0; j < 1000; ++j) {
         writer.input.put(i);
       }
-      //writer.endWindow();
+      writer.endWindow();
       if ((i % 2) == 1) {
         writer.beforeCheckpoint(i);
         evenOffsets.add(evenCounterContext.getCounter());
         oddOffsets.add(oddCounterContext.getCounter());
-        //evenOffsets.add(evenFile.length());
-        //oddOffsets.add(oddFile.length());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
index bcb1bc3..daebecb 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
@@ -22,6 +22,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.Description;
 
+import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
 import com.datatorrent.lib.util.TestUtils.TestInfo;
@@ -70,7 +71,7 @@ public class AbstractWindowFileOutputOperatorTest
     oper.input.process("window 0");
     oper.endWindow();
 
-    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0);
+    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID);
 
     oper.beginWindow(1);
     oper.input.process("window 1");
@@ -110,7 +111,7 @@ public class AbstractWindowFileOutputOperatorTest
     oper.beginWindow(1);
     oper.input.process("1");
 
-    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1);
+    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID);
 
     oper.input.process("1");
     oper.teardown();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
index 4213a00..4042d3c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
@@ -99,6 +99,7 @@ public class FSInputModuleAppTest
     conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
     conf.set("dt.operator.hdfsInputModule.prop.blocksThreshold", "4");
     conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
+    conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","10");
 
     LocalMode lma = LocalMode.newInstance();
     lma.prepareDAG(app, conf);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
index 6082f57..8b8ed01 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
@@ -62,6 +62,7 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes
       }
       writer.endWindow();
     }
+    checkpoint(writer, 10);
     writer.committed(10);
 
     for (int i = 13; i <= 26; i++) {
@@ -71,7 +72,10 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes
       }
       writer.endWindow();
     }
+    checkpoint(writer, 20);
     writer.committed(20);
+
+    checkpoint(writer, 26);
     writer.committed(26);
 
     String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n",
@@ -108,8 +112,10 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes
       }
       writer.endWindow();
       if (i % 10 == 0) {
+        checkpoint(writer, 10);
         writer.committed(10);
       }
+      checkpoint(writer, 24);
     }
     writer.committed(tuples.length);
 


[2/2] apex-malhar git commit: Using CheckpointNotificationListener and beforeCheckpoint callback to do IO in a more optimized fashion

Posted by th...@apache.org.
Using CheckpointNotificationListener and beforeCheckpoint callback to do IO in a more optimized fashion


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/28a7e347
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/28a7e347
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/28a7e347

Branch: refs/heads/master
Commit: 28a7e347691dc5dbe364888c4520f5c31139bc39
Parents: 2cf8bad
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Mar 16 20:57:39 2016 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Wed Oct 19 22:28:34 2016 -0700

----------------------------------------------------------------------
 .../hbase/AbstractHBasePutOutputOperator.java   | 57 +++++++-------------
 .../kinesis/AbstractKinesisInputOperator.java   |  8 ++-
 .../kinesis/AbstractKinesisOutputOperator.java  | 18 ++++++-
 .../rabbitmq/AbstractRabbitMQInputOperator.java |  7 ++-
 .../redis/AbstractRedisInputOperator.java       |  9 +++-
 .../contrib/splunk/SplunkTcpOutputOperator.java | 17 ++++--
 .../hive/AbstractFSRollingOutputOperator.java   |  5 +-
 .../lib/io/fs/AbstractFileInputOperator.java    | 10 +++-
 .../lib/io/fs/AbstractFileOutputOperator.java   | 49 ++++++++++-------
 .../lib/io/fs/AbstractReconciler.java           | 12 +++--
 .../com/datatorrent/lib/io/fs/FileSplitter.java |  7 ++-
 .../lib/io/fs/FileSplitterInput.java            |  7 ++-
 .../lib/io/jms/AbstractJMSInputOperator.java    |  7 ++-
 .../lib/join/AbstractJoinOperator.java          |  7 ++-
 .../io/fs/AbstractFileOutputOperatorTest.java   | 47 ++++++++++------
 .../AbstractWindowFileOutputOperatorTest.java   |  4 +-
 16 files changed, 171 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
index 3973008..7f93394 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
@@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
 
 import javax.validation.constraints.Min;
 
+import com.datatorrent.api.Operator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,27 +53,21 @@ import com.datatorrent.netlet.util.DTThrowable;
  *            The tuple type
  * @since 1.0.2
  */
-public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> {
+public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> implements Operator.CheckpointNotificationListener {
   private static final transient Logger logger = LoggerFactory.getLogger(AbstractHBasePutOutputOperator.class);
-  public static final int DEFAULT_BATCH_SIZE = 1000;
-  private int batchSize = DEFAULT_BATCH_SIZE;
-  protected int unCommittedSize = 0;
 
-  public AbstractHBasePutOutputOperator() {
+  public AbstractHBasePutOutputOperator()
+  {
     store = new HBaseStore();
   }
 
   @Override
-  public void processTuple(T tuple) {
+  public void processTuple(T tuple)
+  {
     HTable table = store.getTable();
     Put put = operationPut(tuple);
     try {
       table.put(put);
-      if( ++unCommittedSize >= batchSize )
-      {
-        table.flushCommits();
-        unCommittedSize = 0;
-      }
     } catch (RetriesExhaustedWithDetailsException e) {
       logger.error("Could not output tuple", e);
       DTThrowable.rethrow(e);
@@ -80,46 +75,30 @@ public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOut
       logger.error("Could not output tuple", e);
       DTThrowable.rethrow(e);
     }
-
   }
 
   @Override
-  public void endWindow()
+  public void beforeCheckpoint(long windowId)
   {
-    try
-    {
-      if( unCommittedSize > 0 ) {
-        store.getTable().flushCommits();
-        unCommittedSize = 0;
-      }
-    }
-    catch (RetriesExhaustedWithDetailsException e) {
-      logger.error("Could not output tuple", e);
-      DTThrowable.rethrow(e);
+    try {
+      store.getTable().flushCommits();
     } catch (InterruptedIOException e) {
-      logger.error("Could not output tuple", e);
+      DTThrowable.rethrow(e);
+    } catch (RetriesExhaustedWithDetailsException e) {
       DTThrowable.rethrow(e);
     }
   }
 
-  public abstract Put operationPut(T t);
+  @Override
+  public void checkpointed(long l) {
 
-  /**
-   * the batch size save flush data
-   */
-  @Min(1)
-  public int getBatchSize()
-  {
-    return batchSize;
   }
 
-  /**
-   * the batch size save flush data
-   */
-  public void setBatchSize(int batchSize)
-  {
-    this.batchSize = batchSize;
-  }
+  @Override
+  public void committed(long l) {
 
+  }
 
+  public abstract Put operationPut(T t);
+  
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index c03df21..fc10bea 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -86,7 +86,7 @@ class KinesisPair <F, S> extends Pair<F, S>
  * @since 2.0.0
  */
 @SuppressWarnings("rawtypes")
-public abstract class AbstractKinesisInputOperator <T> implements InputOperator, ActivationListener<OperatorContext>, Partitioner<AbstractKinesisInputOperator>, StatsListener,Operator.CheckpointListener
+public abstract class AbstractKinesisInputOperator <T> implements InputOperator, ActivationListener<OperatorContext>, Partitioner<AbstractKinesisInputOperator>, StatsListener,Operator.CheckpointNotificationListener
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisInputOperator.class);
 
@@ -530,6 +530,12 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
   public void checkpointed(long windowId)
   {
   }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
   /**
    * Implement ActivationListener Interface.
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
index 43fc62a..d6f9d36 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
@@ -49,7 +49,7 @@ import java.util.List;
  * @param <T>
  * @since 2.0.0
  */
-public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
+public abstract class AbstractKinesisOutputOperator<V, T> implements Operator, Operator.CheckpointNotificationListener
 {
   private static final Logger logger = LoggerFactory.getLogger( AbstractKinesisOutputOperator.class );
   protected String streamName;
@@ -91,6 +91,10 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
   {
   }
 
+  @Override
+  public void endWindow() {
+  }
+
   /**
    * Implement Component Interface.
    */
@@ -103,7 +107,7 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
    * Implement Operator Interface.
    */
   @Override
-  public void endWindow()
+  public void beforeCheckpoint(long windowId)
   {
     if (isBatchProcessing && putRecordsRequestEntryList.size() != 0) {
       try {
@@ -114,6 +118,16 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
     }
   }
 
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+  }
+
   /**
    * Implement Component Interface.
    *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index 08157bc..672122c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -77,7 +77,7 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
  */
 public abstract class AbstractRabbitMQInputOperator<T> implements
     InputOperator, Operator.ActivationListener<OperatorContext>,
-    Operator.CheckpointListener
+    Operator.CheckpointNotificationListener
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);
   @NotNull
@@ -312,6 +312,11 @@ public abstract class AbstractRabbitMQInputOperator<T> implements
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 0b12574..092a3c6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -31,7 +31,7 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import redis.clients.jedis.ScanParams;
 import redis.clients.jedis.ScanResult;
 
-import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
@@ -47,7 +47,7 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
  *          The tuple type.
  * @since 0.9.3
  */
-public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointListener
+public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointNotificationListener
 {
   protected transient List<String> keys = new ArrayList<String>();
   protected transient Integer scanOffset;
@@ -225,6 +225,11 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor
   abstract public void processTuples();
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
index cb81375..f7e98b6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
@@ -19,6 +19,7 @@
 package com.datatorrent.contrib.splunk;
 
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
 import com.datatorrent.lib.db.AbstractStoreOutputOperator;
 import com.splunk.TcpInput;
 
@@ -34,7 +35,7 @@ import java.net.Socket;
  * @tags splunk
  * @since 1.0.4
  */
-public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, SplunkStore> {
+public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, SplunkStore> implements Operator.CheckpointNotificationListener {
 
   private String tcpPort;
   private transient Socket socket;
@@ -75,8 +76,8 @@ public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, S
   }
 
   @Override
-  public void endWindow() {
-
+  public void beforeCheckpoint(long windowId)
+  {
     try {
       stream.flush();
     } catch (IOException e) {
@@ -85,6 +86,16 @@ public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, S
   }
 
   @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+  }
+
+  @Override
   public void teardown() {
 
     super.teardown();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
index 3c9c4da..37b5f2e 100755
--- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
@@ -37,7 +37,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
 import com.datatorrent.netlet.util.DTThrowable;
@@ -52,8 +52,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  *
  * @since 2.1.0
  */
-public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T>
-    implements CheckpointListener
+public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T> implements CheckpointNotificationListener
 {
   private transient String outputFilePath;
   protected MutableInt partNumber;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 9e80b4e..14cabfc 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.validation.OverridesAttribute;
 import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
@@ -97,8 +98,8 @@ import com.datatorrent.lib.util.KryoCloneUtils;
  * @since 1.0.2
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractFileInputOperator<T>
-    implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener
+public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener,
+  Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class);
 
@@ -909,6 +910,11 @@ public abstract class AbstractFileInputOperator<T>
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index ab8bedc..f703a19 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -110,7 +110,7 @@ import com.datatorrent.lib.counters.BasicCounters;
  * @since 2.0.0
  */
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
-public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener
+public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class);
 
@@ -262,6 +262,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   private Long expireStreamAfterAccessMillis;
   private final Set<String> filesWithOpenStreams;
 
+  private boolean initializeContext;
+
   /**
    * This input port receives incoming tuples.
    */
@@ -966,13 +968,16 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   @Override
   public void beginWindow(long windowId)
   {
-    try {
-      Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
-      for (FSFilterStreamContext streamContext : openStreams.values()) {
-        streamContext.initializeContext();
+    if (initializeContext) {
+      try {
+        Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
+        for (FSFilterStreamContext streamContext : openStreams.values()) {
+          streamContext.initializeContext();
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+      initializeContext = false;
     }
     currentWindow = windowId;
   }
@@ -980,18 +985,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   @Override
   public void endWindow()
   {
-    try {
-      Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
-      for (FSFilterStreamContext streamContext: openStreams.values()) {
-        long start = System.currentTimeMillis();
-        streamContext.finalizeContext();
-        totalWritingTime += System.currentTimeMillis() - start;
-        //streamContext.resetFilter();
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
     if (rotationWindows > 0) {
       if (++rotationCount == rotationWindows) {
         rotationCount = 0;
@@ -1239,6 +1232,24 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   }
 
   @Override
+  public void beforeCheckpoint(long l)
+  {
+    try {
+      Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
+      for (FSFilterStreamContext streamContext: openStreams.values()) {
+        long start = System.currentTimeMillis();
+        streamContext.finalizeContext();
+        totalWritingTime += System.currentTimeMillis() - start;
+        //streamContext.resetFilter();
+        // Re-initialize context when next window starts after checkpoint
+        initializeContext = true;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   public void checkpointed(long l)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index 945c000..c12becd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -25,17 +25,14 @@ import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
-
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator.CheckpointListener;
 import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.NameableThreadFactory;
 import com.datatorrent.netlet.util.DTThrowable;
@@ -55,7 +52,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  * @since 2.0.0
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointListener, IdleTimeHandler
+public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointNotificationListener, IdleTimeHandler
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class);
   public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
@@ -125,6 +122,11 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
   }
 
   @Override
+  public void beforeCheckpoint(long l)
+  {
+  }
+
+  @Override
   public void checkpointed(long l)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index 4bb53e5..b9594b3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -79,7 +79,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  */
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
 @Deprecated
-public class FileSplitter implements InputOperator, Operator.CheckpointListener
+public class FileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   protected Long blockSize;
   private int sequenceNo;
@@ -380,6 +380,11 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
   }
 
   @Override
+  public void beforeCheckpoint(long l)
+  {
+  }
+
+  @Override
   public void checkpointed(long l)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index 2d290cd..745f953 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -76,7 +76,7 @@ import com.datatorrent.api.annotation.Stateless;
  * @since 2.0.0
  */
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
-public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener
+public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   @NotNull
   private WindowDataManager windowDataManager;
@@ -219,6 +219,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   }
 
   @Override
+  public void beforeCheckpoint(long l)
+  {
+  }
+
+  @Override
   public void checkpointed(long l)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index 2b8b58d..72bf63c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -80,7 +80,7 @@ import com.datatorrent.lib.counters.BasicCounters;
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
 public abstract class AbstractJMSInputOperator<T> extends JMSBase
     implements InputOperator, ActivationListener<OperatorContext>, MessageListener, ExceptionListener,
-    Operator.IdleTimeHandler, Operator.CheckpointListener
+    Operator.IdleTimeHandler, Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k
 
@@ -395,6 +395,11 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
index 89df25e..d0f722d 100644
--- a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
@@ -64,7 +64,7 @@ import com.datatorrent.common.util.BaseOperator;
  * @since 3.4.0
  */
 @InterfaceStability.Unstable
-public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
+public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointNotificationListener
 {
   @AutoMetric
   private long tuplesJoinedPerSec;
@@ -226,6 +226,11 @@ public abstract class AbstractJoinOperator<T> extends BaseOperator implements Op
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
     leftStore.getStore().checkpointed(windowId);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 03f3bf6..5b58121 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -243,8 +243,11 @@ public class AbstractFileOutputOperatorTest
    * @param writer The writer to checkpoint.
    * @return new writer.
    */
-  public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer)
+  public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId)
   {
+    if (windowId >= 0) {
+      writer.beforeCheckpoint(windowId);
+    }
     Kryo kryo = new Kryo();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     Output loutput = new Output(bos);
@@ -418,7 +421,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -577,6 +580,7 @@ public class AbstractFileOutputOperatorTest
 
     writer.requestFinalize(EVEN_FILE);
     writer.requestFinalize(ODD_FILE);
+    writer.beforeCheckpoint(1);
     writer.committed(1);
   }
 
@@ -603,6 +607,7 @@ public class AbstractFileOutputOperatorTest
 
     writer.requestFinalize(ODD_FILE);
     writer.requestFinalize(EVEN_FILE);
+    writer.beforeCheckpoint(1);
     writer.committed(1);
   }
 
@@ -672,7 +677,7 @@ public class AbstractFileOutputOperatorTest
     writer.requestFinalize(EVEN_FILE);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(4);
@@ -690,6 +695,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(8);
     writer.input.put(9);
     writer.endWindow();
+    writer.beforeCheckpoint(2);
     writer.committed(2);
   }
 
@@ -815,7 +821,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(2);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(3);
@@ -862,8 +868,8 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(4);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
-    AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1);
 
     LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
 
@@ -1128,7 +1134,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1225,7 +1231,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1272,7 +1278,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.process(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.process(2);
@@ -1363,7 +1369,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(3);
     writer.input.put(4);
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1);
 
     writer.input.put(3);
     writer.input.put(4);
@@ -1554,8 +1560,11 @@ public class AbstractFileOutputOperatorTest
         writer.input.put(i);
       }
       writer.endWindow();
-      evenOffsets.add(evenFile.length());
-      oddOffsets.add(oddFile.length());
+      if ((i % 2) == 1) {
+        writer.beforeCheckpoint(i);
+        evenOffsets.add(evenFile.length());
+        oddOffsets.add(oddFile.length());
+      }
     }
 
     writer.teardown();
@@ -1580,6 +1589,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(2);
     writer.input.put(3);
     writer.endWindow();
+    writer.beforeCheckpoint(0);
 
     //failure and restored
     writer.setup(testMeta.testOperatorContext);
@@ -1756,11 +1766,14 @@ public class AbstractFileOutputOperatorTest
       for (int j = 0; j < 1000; ++j) {
         writer.input.put(i);
       }
-      writer.endWindow();
-      evenOffsets.add(evenCounterContext.getCounter());
-      oddOffsets.add(oddCounterContext.getCounter());
-      //evenOffsets.add(evenFile.length());
-      //oddOffsets.add(oddFile.length());
+      //writer.endWindow();
+      if ((i % 2) == 1) {
+        writer.beforeCheckpoint(i);
+        evenOffsets.add(evenCounterContext.getCounter());
+        oddOffsets.add(oddCounterContext.getCounter());
+        //evenOffsets.add(evenFile.length());
+        //oddOffsets.add(oddFile.length());
+      }
     }
 
     writer.teardown();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
index 32c32f7..bcb1bc3 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
@@ -70,7 +70,7 @@ public class AbstractWindowFileOutputOperatorTest
     oper.input.process("window 0");
     oper.endWindow();
 
-    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper);
+    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0);
 
     oper.beginWindow(1);
     oper.input.process("window 1");
@@ -110,7 +110,7 @@ public class AbstractWindowFileOutputOperatorTest
     oper.beginWindow(1);
     oper.input.process("1");
 
-    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper);
+    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1);
 
     oper.input.process("1");
     oper.teardown();