You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/20 14:33:52 UTC

[2/2] apex-malhar git commit: Using CheckpointNotificationListener and beforeCheckpoint callback to do IO in a more optimized fashion

Using CheckpointNotificationListener and beforeCheckpoint callback to do IO in a more optimized fashion


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/28a7e347
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/28a7e347
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/28a7e347

Branch: refs/heads/master
Commit: 28a7e347691dc5dbe364888c4520f5c31139bc39
Parents: 2cf8bad
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Mar 16 20:57:39 2016 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Wed Oct 19 22:28:34 2016 -0700

----------------------------------------------------------------------
 .../hbase/AbstractHBasePutOutputOperator.java   | 57 +++++++-------------
 .../kinesis/AbstractKinesisInputOperator.java   |  8 ++-
 .../kinesis/AbstractKinesisOutputOperator.java  | 18 ++++++-
 .../rabbitmq/AbstractRabbitMQInputOperator.java |  7 ++-
 .../redis/AbstractRedisInputOperator.java       |  9 +++-
 .../contrib/splunk/SplunkTcpOutputOperator.java | 17 ++++--
 .../hive/AbstractFSRollingOutputOperator.java   |  5 +-
 .../lib/io/fs/AbstractFileInputOperator.java    | 10 +++-
 .../lib/io/fs/AbstractFileOutputOperator.java   | 49 ++++++++++-------
 .../lib/io/fs/AbstractReconciler.java           | 12 +++--
 .../com/datatorrent/lib/io/fs/FileSplitter.java |  7 ++-
 .../lib/io/fs/FileSplitterInput.java            |  7 ++-
 .../lib/io/jms/AbstractJMSInputOperator.java    |  7 ++-
 .../lib/join/AbstractJoinOperator.java          |  7 ++-
 .../io/fs/AbstractFileOutputOperatorTest.java   | 47 ++++++++++------
 .../AbstractWindowFileOutputOperatorTest.java   |  4 +-
 16 files changed, 171 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
index 3973008..7f93394 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java
@@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
 
 import javax.validation.constraints.Min;
 
+import com.datatorrent.api.Operator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,27 +53,21 @@ import com.datatorrent.netlet.util.DTThrowable;
  *            The tuple type
  * @since 1.0.2
  */
-public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> {
+public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> implements Operator.CheckpointNotificationListener {
   private static final transient Logger logger = LoggerFactory.getLogger(AbstractHBasePutOutputOperator.class);
-  public static final int DEFAULT_BATCH_SIZE = 1000;
-  private int batchSize = DEFAULT_BATCH_SIZE;
-  protected int unCommittedSize = 0;
 
-  public AbstractHBasePutOutputOperator() {
+  public AbstractHBasePutOutputOperator()
+  {
     store = new HBaseStore();
   }
 
   @Override
-  public void processTuple(T tuple) {
+  public void processTuple(T tuple)
+  {
     HTable table = store.getTable();
     Put put = operationPut(tuple);
     try {
       table.put(put);
-      if( ++unCommittedSize >= batchSize )
-      {
-        table.flushCommits();
-        unCommittedSize = 0;
-      }
     } catch (RetriesExhaustedWithDetailsException e) {
       logger.error("Could not output tuple", e);
       DTThrowable.rethrow(e);
@@ -80,46 +75,30 @@ public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOut
       logger.error("Could not output tuple", e);
       DTThrowable.rethrow(e);
     }
-
   }
 
   @Override
-  public void endWindow()
+  public void beforeCheckpoint(long windowId)
   {
-    try
-    {
-      if( unCommittedSize > 0 ) {
-        store.getTable().flushCommits();
-        unCommittedSize = 0;
-      }
-    }
-    catch (RetriesExhaustedWithDetailsException e) {
-      logger.error("Could not output tuple", e);
-      DTThrowable.rethrow(e);
+    try {
+      store.getTable().flushCommits();
     } catch (InterruptedIOException e) {
-      logger.error("Could not output tuple", e);
+      DTThrowable.rethrow(e);
+    } catch (RetriesExhaustedWithDetailsException e) {
       DTThrowable.rethrow(e);
     }
   }
 
-  public abstract Put operationPut(T t);
+  @Override
+  public void checkpointed(long l) {
 
-  /**
-   * the batch size save flush data
-   */
-  @Min(1)
-  public int getBatchSize()
-  {
-    return batchSize;
   }
 
-  /**
-   * the batch size save flush data
-   */
-  public void setBatchSize(int batchSize)
-  {
-    this.batchSize = batchSize;
-  }
+  @Override
+  public void committed(long l) {
 
+  }
 
+  public abstract Put operationPut(T t);
+  
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index c03df21..fc10bea 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -86,7 +86,7 @@ class KinesisPair <F, S> extends Pair<F, S>
  * @since 2.0.0
  */
 @SuppressWarnings("rawtypes")
-public abstract class AbstractKinesisInputOperator <T> implements InputOperator, ActivationListener<OperatorContext>, Partitioner<AbstractKinesisInputOperator>, StatsListener,Operator.CheckpointListener
+public abstract class AbstractKinesisInputOperator <T> implements InputOperator, ActivationListener<OperatorContext>, Partitioner<AbstractKinesisInputOperator>, StatsListener,Operator.CheckpointNotificationListener
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisInputOperator.class);
 
@@ -530,6 +530,12 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
   public void checkpointed(long windowId)
   {
   }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
   /**
    * Implement ActivationListener Interface.
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
index 43fc62a..d6f9d36 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
@@ -49,7 +49,7 @@ import java.util.List;
  * @param <T>
  * @since 2.0.0
  */
-public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
+public abstract class AbstractKinesisOutputOperator<V, T> implements Operator, Operator.CheckpointNotificationListener
 {
   private static final Logger logger = LoggerFactory.getLogger( AbstractKinesisOutputOperator.class );
   protected String streamName;
@@ -91,6 +91,10 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
   {
   }
 
+  @Override
+  public void endWindow() {
+  }
+
   /**
    * Implement Component Interface.
    */
@@ -103,7 +107,7 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
    * Implement Operator Interface.
    */
   @Override
-  public void endWindow()
+  public void beforeCheckpoint(long windowId)
   {
     if (isBatchProcessing && putRecordsRequestEntryList.size() != 0) {
       try {
@@ -114,6 +118,16 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator
     }
   }
 
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+  }
+
   /**
    * Implement Component Interface.
    *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index 08157bc..672122c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -77,7 +77,7 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
  */
 public abstract class AbstractRabbitMQInputOperator<T> implements
     InputOperator, Operator.ActivationListener<OperatorContext>,
-    Operator.CheckpointListener
+    Operator.CheckpointNotificationListener
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);
   @NotNull
@@ -312,6 +312,11 @@ public abstract class AbstractRabbitMQInputOperator<T> implements
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 0b12574..092a3c6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -31,7 +31,7 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import redis.clients.jedis.ScanParams;
 import redis.clients.jedis.ScanResult;
 
-import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
@@ -47,7 +47,7 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
  *          The tuple type.
  * @since 0.9.3
  */
-public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointListener
+public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointNotificationListener
 {
   protected transient List<String> keys = new ArrayList<String>();
   protected transient Integer scanOffset;
@@ -225,6 +225,11 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor
   abstract public void processTuples();
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
index cb81375..f7e98b6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java
@@ -19,6 +19,7 @@
 package com.datatorrent.contrib.splunk;
 
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
 import com.datatorrent.lib.db.AbstractStoreOutputOperator;
 import com.splunk.TcpInput;
 
@@ -34,7 +35,7 @@ import java.net.Socket;
  * @tags splunk
  * @since 1.0.4
  */
-public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, SplunkStore> {
+public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, SplunkStore> implements Operator.CheckpointNotificationListener {
 
   private String tcpPort;
   private transient Socket socket;
@@ -75,8 +76,8 @@ public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, S
   }
 
   @Override
-  public void endWindow() {
-
+  public void beforeCheckpoint(long windowId)
+  {
     try {
       stream.flush();
     } catch (IOException e) {
@@ -85,6 +86,16 @@ public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, S
   }
 
   @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+  }
+
+  @Override
   public void teardown() {
 
     super.teardown();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
index 3c9c4da..37b5f2e 100755
--- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
@@ -37,7 +37,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
 import com.datatorrent.netlet.util.DTThrowable;
@@ -52,8 +52,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  *
  * @since 2.1.0
  */
-public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T>
-    implements CheckpointListener
+public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T> implements CheckpointNotificationListener
 {
   private transient String outputFilePath;
   protected MutableInt partNumber;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 9e80b4e..14cabfc 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.validation.OverridesAttribute;
 import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
@@ -97,8 +98,8 @@ import com.datatorrent.lib.util.KryoCloneUtils;
  * @since 1.0.2
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractFileInputOperator<T>
-    implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener
+public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener,
+  Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class);
 
@@ -909,6 +910,11 @@ public abstract class AbstractFileInputOperator<T>
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index ab8bedc..f703a19 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -110,7 +110,7 @@ import com.datatorrent.lib.counters.BasicCounters;
  * @since 2.0.0
  */
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
-public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener
+public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class);
 
@@ -262,6 +262,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   private Long expireStreamAfterAccessMillis;
   private final Set<String> filesWithOpenStreams;
 
+  private boolean initializeContext;
+
   /**
    * This input port receives incoming tuples.
    */
@@ -966,13 +968,16 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   @Override
   public void beginWindow(long windowId)
   {
-    try {
-      Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
-      for (FSFilterStreamContext streamContext : openStreams.values()) {
-        streamContext.initializeContext();
+    if (initializeContext) {
+      try {
+        Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
+        for (FSFilterStreamContext streamContext : openStreams.values()) {
+          streamContext.initializeContext();
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+      initializeContext = false;
     }
     currentWindow = windowId;
   }
@@ -980,18 +985,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   @Override
   public void endWindow()
   {
-    try {
-      Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
-      for (FSFilterStreamContext streamContext: openStreams.values()) {
-        long start = System.currentTimeMillis();
-        streamContext.finalizeContext();
-        totalWritingTime += System.currentTimeMillis() - start;
-        //streamContext.resetFilter();
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
     if (rotationWindows > 0) {
       if (++rotationCount == rotationWindows) {
         rotationCount = 0;
@@ -1239,6 +1232,24 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   }
 
   @Override
+  public void beforeCheckpoint(long l)
+  {
+    try {
+      Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
+      for (FSFilterStreamContext streamContext: openStreams.values()) {
+        long start = System.currentTimeMillis();
+        streamContext.finalizeContext();
+        totalWritingTime += System.currentTimeMillis() - start;
+        //streamContext.resetFilter();
+        // Re-initialize context when next window starts after checkpoint
+        initializeContext = true;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   public void checkpointed(long l)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index 945c000..c12becd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -25,17 +25,14 @@ import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
-
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator.CheckpointListener;
 import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.NameableThreadFactory;
 import com.datatorrent.netlet.util.DTThrowable;
@@ -55,7 +52,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  * @since 2.0.0
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointListener, IdleTimeHandler
+public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointNotificationListener, IdleTimeHandler
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class);
   public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
@@ -125,6 +122,11 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
   }
 
   @Override
+  public void beforeCheckpoint(long l)
+  {
+  }
+
+  @Override
   public void checkpointed(long l)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index 4bb53e5..b9594b3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -79,7 +79,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  */
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
 @Deprecated
-public class FileSplitter implements InputOperator, Operator.CheckpointListener
+public class FileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   protected Long blockSize;
   private int sequenceNo;
@@ -380,6 +380,11 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
   }
 
   @Override
+  public void beforeCheckpoint(long l)
+  {
+  }
+
+  @Override
   public void checkpointed(long l)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index 2d290cd..745f953 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -76,7 +76,7 @@ import com.datatorrent.api.annotation.Stateless;
  * @since 2.0.0
  */
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
-public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener
+public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   @NotNull
   private WindowDataManager windowDataManager;
@@ -219,6 +219,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   }
 
   @Override
+  public void beforeCheckpoint(long l)
+  {
+  }
+
+  @Override
   public void checkpointed(long l)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index 2b8b58d..72bf63c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -80,7 +80,7 @@ import com.datatorrent.lib.counters.BasicCounters;
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
 public abstract class AbstractJMSInputOperator<T> extends JMSBase
     implements InputOperator, ActivationListener<OperatorContext>, MessageListener, ExceptionListener,
-    Operator.IdleTimeHandler, Operator.CheckpointListener
+    Operator.IdleTimeHandler, Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k
 
@@ -395,6 +395,11 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
index 89df25e..d0f722d 100644
--- a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
@@ -64,7 +64,7 @@ import com.datatorrent.common.util.BaseOperator;
  * @since 3.4.0
  */
 @InterfaceStability.Unstable
-public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
+public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointNotificationListener
 {
   @AutoMetric
   private long tuplesJoinedPerSec;
@@ -226,6 +226,11 @@ public abstract class AbstractJoinOperator<T> extends BaseOperator implements Op
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
   public void checkpointed(long windowId)
   {
     leftStore.getStore().checkpointed(windowId);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 03f3bf6..5b58121 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -243,8 +243,11 @@ public class AbstractFileOutputOperatorTest
    * @param writer The writer to checkpoint.
    * @return new writer.
    */
-  public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer)
+  public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId)
   {
+    if (windowId >= 0) {
+      writer.beforeCheckpoint(windowId);
+    }
     Kryo kryo = new Kryo();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     Output loutput = new Output(bos);
@@ -418,7 +421,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -577,6 +580,7 @@ public class AbstractFileOutputOperatorTest
 
     writer.requestFinalize(EVEN_FILE);
     writer.requestFinalize(ODD_FILE);
+    writer.beforeCheckpoint(1);
     writer.committed(1);
   }
 
@@ -603,6 +607,7 @@ public class AbstractFileOutputOperatorTest
 
     writer.requestFinalize(ODD_FILE);
     writer.requestFinalize(EVEN_FILE);
+    writer.beforeCheckpoint(1);
     writer.committed(1);
   }
 
@@ -672,7 +677,7 @@ public class AbstractFileOutputOperatorTest
     writer.requestFinalize(EVEN_FILE);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(4);
@@ -690,6 +695,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(8);
     writer.input.put(9);
     writer.endWindow();
+    writer.beforeCheckpoint(2);
     writer.committed(2);
   }
 
@@ -815,7 +821,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(2);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(3);
@@ -862,8 +868,8 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(4);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
-    AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1);
 
     LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
 
@@ -1128,7 +1134,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1225,7 +1231,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1272,7 +1278,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.process(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
 
     writer.beginWindow(1);
     writer.input.process(2);
@@ -1363,7 +1369,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(3);
     writer.input.put(4);
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1);
 
     writer.input.put(3);
     writer.input.put(4);
@@ -1554,8 +1560,11 @@ public class AbstractFileOutputOperatorTest
         writer.input.put(i);
       }
       writer.endWindow();
-      evenOffsets.add(evenFile.length());
-      oddOffsets.add(oddFile.length());
+      if ((i % 2) == 1) {
+        writer.beforeCheckpoint(i);
+        evenOffsets.add(evenFile.length());
+        oddOffsets.add(oddFile.length());
+      }
     }
 
     writer.teardown();
@@ -1580,6 +1589,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(2);
     writer.input.put(3);
     writer.endWindow();
+    writer.beforeCheckpoint(0);
 
     //failure and restored
     writer.setup(testMeta.testOperatorContext);
@@ -1756,11 +1766,14 @@ public class AbstractFileOutputOperatorTest
       for (int j = 0; j < 1000; ++j) {
         writer.input.put(i);
       }
-      writer.endWindow();
-      evenOffsets.add(evenCounterContext.getCounter());
-      oddOffsets.add(oddCounterContext.getCounter());
-      //evenOffsets.add(evenFile.length());
-      //oddOffsets.add(oddFile.length());
+      //writer.endWindow();
+      if ((i % 2) == 1) {
+        writer.beforeCheckpoint(i);
+        evenOffsets.add(evenCounterContext.getCounter());
+        oddOffsets.add(oddCounterContext.getCounter());
+        //evenOffsets.add(evenFile.length());
+        //oddOffsets.add(oddFile.length());
+      }
     }
 
     writer.teardown();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
index 32c32f7..bcb1bc3 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
@@ -70,7 +70,7 @@ public class AbstractWindowFileOutputOperatorTest
     oper.input.process("window 0");
     oper.endWindow();
 
-    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper);
+    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0);
 
     oper.beginWindow(1);
     oper.input.process("window 1");
@@ -110,7 +110,7 @@ public class AbstractWindowFileOutputOperatorTest
     oper.beginWindow(1);
     oper.input.process("1");
 
-    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper);
+    AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1);
 
     oper.input.process("1");
     oper.teardown();