You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/27 04:46:08 UTC

[2/4] apex-malhar git commit: APEXMALHAR-2063 Made window data manager use file system wal

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
index 3de6a1b..81f6aa0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
@@ -18,324 +18,687 @@
  */
 package org.apache.apex.malhar.lib.wal;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 
 import javax.validation.constraints.NotNull;
 
-import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager;
+import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultimap;
+import com.google.common.primitives.Longs;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
 
 /**
- * An {@link WindowDataManager} that uses FS to persist state.
+ * An {@link WindowDataManager} that uses FS to persist state every completed application window.<p/>
+ *
+ * FSWindowDataManager uses {@link FSWindowReplayWAL} to write to files. While saving an artifact corresponding
+ * to a window, the window date manager saves:
+ * <ol>
+ *   <li>Window id</li>
+ *   <li>Artifact</li>
+ * </ol>
+ * In order to ensure that all the entries corresponding to a window id are appended to the same wal part file, the
+ * wal operates in batch mode. In batch mode, the rotation of a wal part is done only after a batch is complete.<br/>
+ * <p/>
+ *
+ * <b>Replaying data of a completed window</b><br/>
+ * Main support that {@link WindowDataManager} provides to input operators is to be able to replay windows which
+ * were completely processed but not checkpointed. This is necessary for making input operators idempotent.<br/>
+ * The {@link FileSystemWAL}, however, ignores any data which is not checkpointed after failure. Therefore,
+ * {@link FSWindowDataManager} cannot rely solely on the state in wal after failures and so during recovery it modifies
+ * the wal state by traversing through the wal files.<br/>
+ * <br/>
+ * {@link IncrementalCheckpointManager}, however, relies only on the checkpointed state and therefore sets
+ * {@link #relyOnCheckpoints} to true. This is because {@link IncrementalCheckpointManager} only saves data per
+ * checkpoint window.
+ * <p/>
+ *
+ * <b>Purging of stale artifacts</b><br/>
+ * When a window gets committed, it indicates that all the operators in the DAG have completely finished processing that
+ * window. This means that the data of this window can be deleted as it will never be requested for replaying.
+ * Operators can invoke {@link #committed(long)} callback of {@link FSWindowDataManager} to trigger deletion of stale
+ * artifacts.<br/>
+ * <p/>
+ *
+ * <b>Dynamic partitioning support provided</b><br/>
+ * An operator can call {@link #partition(int, Set)} to get new instances of {@link FSWindowDataManager} during
+ * re-partitioning. When operator partitions are removed, then one of the new instances will handle the state of
+ * all deleted instances.<br/>
+ * After re-partitioning, the largest completed window is the min of max completed windows across all partitions.</br>
+ *
+ * <p/>
+ * At times, after re-partitioning, a physical operator may want to read the data saved by all the partitions for a
+ * completed window id. For example,  {@link AbstractFileInputOperator}, needs to redistribute files based on the hash
+ * of file-paths and its partition keys, so it reads artifacts saved by all partitions during replay of a completed
+ * window. {@link #retrieveAllPartitions(long)} retrieves the artifacts of all partitions wrt a completed window.
+ *
  *
  * @since 3.4.0
  */
 public class FSWindowDataManager implements WindowDataManager
 {
-  private static final String DEF_RECOVERY_PATH = "idempotentState";
-
-  protected transient FSStorageAgent storageAgent;
+  private static final String DEF_STATE_PATH = "idempotentState";
+  private static final String WAL_FILE_NAME = "wal";
 
   /**
-   * Recovery path relative to app path where state is saved.
+   * State path relative to app filePath where state is saved.
    */
   @NotNull
-  private String recoveryPath;
-
-  private boolean isRecoveryPathRelativeToAppPath = true;
+  private String statePath = DEF_STATE_PATH;
 
-  /**
-   * largest window for which there is recovery data across all physical operator instances.
-   */
-  protected transient long largestRecoveryWindow;
+  private boolean isStatePathRelativeToAppPath = true;
 
   /**
    * This is not null only for one physical instance.<br/>
    * It consists of operator ids which have been deleted but have some state that can be replayed.
-   * Only one of the instances would be handling (modifying) the files that belong to this state.
+   * Only one of the instances would be handling (modifying) the files that belong to this state. <br/>
+   * The value is assigned during partitioning.
    */
-  protected Set<Integer> deletedOperators;
+  private Set<Integer> deletedOperators;
+
+  private boolean repartitioned;
 
   /**
-   * Sorted mapping from window id to all the operators that have state to replay for that window.
+   * Used when it is not necessary to replay every streaming/app window.
+   * Used by {@link IncrementalCheckpointManager}
    */
-  protected final transient TreeMultimap<Long, Integer> replayState;
+  private boolean relyOnCheckpoints;
+  
+  private transient long largestCompletedWindow = Stateless.WINDOW_ID;
+
+  private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
+
+  //operator id -> wals (sorted)
+  private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>();
+
+  private transient String fullStatePath;
+  private transient int operatorId;
+
+  private final transient Kryo kryo = new Kryo();
 
-  protected transient FileSystem fs;
-  protected transient Path appPath;
+  private transient FileContext fileContext;
 
   public FSWindowDataManager()
   {
-    replayState = TreeMultimap.create();
-    largestRecoveryWindow = Stateless.WINDOW_ID;
-    recoveryPath = DEF_RECOVERY_PATH;
+    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
   }
 
   @Override
   public void setup(Context.OperatorContext context)
   {
-    Configuration configuration = new Configuration();
-    if (isRecoveryPathRelativeToAppPath) {
-      appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+    operatorId = context.getId();
+
+    if (isStatePathRelativeToAppPath) {
+      fullStatePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + statePath;
     } else {
-      appPath = new Path(recoveryPath);
+      fullStatePath = statePath;
     }
 
     try {
-      storageAgent = new FSStorageAgent(appPath.toString(), configuration);
-
-      fs = FileSystem.newInstance(appPath.toUri(), configuration);
-
-      if (fs.exists(appPath)) {
-        FileStatus[] fileStatuses = fs.listStatus(appPath);
-
-        for (FileStatus operatorDirStatus : fileStatuses) {
-          int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
-
-          for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
-            String fileName = status.getPath().getName();
-            if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
-              continue;
-            }
-            long windowId = Long.parseLong(fileName, 16);
-            replayState.put(windowId, operatorId);
-            if (windowId > largestRecoveryWindow) {
-              largestRecoveryWindow = windowId;
-            }
+      fileContext = FileContextUtils.getFileContext(fullStatePath);
+      setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void setupWals(long activationWindow) throws IOException
+  {
+    findFiles(wal, operatorId);
+    configureWal(wal, operatorId, !relyOnCheckpoints);
+
+    if (repartitioned) {
+      createReadOnlyWals();
+      for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
+        findFiles(entry.getValue(), entry.getKey());
+        configureWal(entry.getValue(), entry.getKey(), true);
+      }
+    }
+
+    //find largest completed window
+    if (!relyOnCheckpoints) {
+      long completedWindow = findLargestCompletedWindow(wal, null);
+      //committed will not delete temp files so it is possible that when reading from files, a smaller window
+      //than the activation window is found.
+      if (completedWindow > activationWindow) {
+        largestCompletedWindow = completedWindow;
+      }
+      if (wal.getReader().getCurrentPointer() != null) {
+        wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy());
+      }
+    } else {
+      wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer();
+      largestCompletedWindow = wal.getLastCheckpointedWindow();
+    }
+
+    if (repartitioned && largestCompletedWindow > Stateless.WINDOW_ID) {
+      //find the min of max window ids: a downstream will not finish a window until all the upstream have finished it.
+      for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
+
+        long completedWindow = Stateless.WINDOW_ID;
+        if (!relyOnCheckpoints) {
+          long window = findLargestCompletedWindow(entry.getValue(), null);
+          if (window > activationWindow) {
+            completedWindow = window;
           }
+        } else {
+          completedWindow = findLargestCompletedWindow(entry.getValue(), activationWindow);
+        }
+
+        if (completedWindow < largestCompletedWindow) {
+          largestCompletedWindow = completedWindow;
         }
       }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
     }
+
+    //reset readers
+    wal.getReader().seek(wal.walStartPointer);
+    for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+      wal.getReader().seek(wal.walStartPointer);
+    }
+
+    wal.setup();
+    for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+      wal.setup();
+    }
+
   }
 
-  @Override
-  public void save(Object object, int operatorId, long windowId) throws IOException
+  protected void createReadOnlyWals() throws IOException
   {
-    storageAgent.save(object, operatorId, windowId);
+    RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(fullStatePath));
+    while (operatorsIter.hasNext()) {
+      FileStatus status = operatorsIter.next();
+      int operatorId = Integer.parseInt(status.getPath().getName());
+
+      if (operatorId != this.operatorId) {
+        //create read-only wal for other partitions
+        FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
+        readOnlyWals.put(operatorId, wal);
+      }
+    }
   }
 
-  @Override
-  public Object load(int operatorId, long windowId) throws IOException
+  private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean updateWalState) throws IOException
   {
-    Set<Integer> operators = replayState.get(windowId);
-    if (operators == null || !operators.contains(operatorId)) {
-      return null;
+    String operatorDir = fullStatePath + Path.SEPARATOR + operatorId;
+    wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME);
+    wal.fileContext = fileContext;
+
+    if (updateWalState) {
+      if (!wal.fileDescriptors.isEmpty()) {
+        SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet();
+
+        wal.walStartPointer = new FileSystemWAL.FileSystemWALPointer(sortedParts.first(), 0);
+
+        FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(sortedParts.last()).last();
+        if (last.isTmp) {
+          wal.tempPartFiles.put(last.part, last.filePath.toString());
+        }
+      }
     }
-    return storageAgent.load(operatorId, windowId);
   }
 
-  @Override
-  public void delete(int operatorId, long windowId) throws IOException
+  private void findFiles(FSWindowReplayWAL wal, int operatorId) throws IOException
   {
-    storageAgent.delete(operatorId, windowId);
+    String operatorDir = fullStatePath + Path.SEPARATOR + operatorId;
+    Path operatorPath = new Path(operatorDir);
+    if (fileContext.util().exists(operatorPath)) {
+      RemoteIterator<FileStatus> walFilesIter = fileContext.listStatus(operatorPath);
+
+      while (walFilesIter.hasNext()) {
+        FileStatus fileStatus = walFilesIter.next();
+        FSWindowReplayWAL.FileDescriptor descriptor = FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath());
+        wal.fileDescriptors.put(descriptor.part, descriptor);
+      }
+    }
   }
 
-  @Override
-  public Map<Integer, Object> load(long windowId) throws IOException
+  private long findLargestCompletedWindow(FSWindowReplayWAL wal, Long ceilingWindow) throws IOException
+  {
+    if (!wal.fileDescriptors.isEmpty()) {
+      FileSystemWAL.FileSystemWALReader reader = wal.getReader();
+
+      //to find the largest window, we only need to look at the last file.
+      NavigableSet<Integer> descendingParts = new TreeSet<>(wal.fileDescriptors.keySet()).descendingSet();
+      for (int part : descendingParts) {
+        FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(part).last();
+        reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0));
+
+        long endOffset = -1;
+
+        long lastWindow = Stateless.WINDOW_ID;
+        Slice slice = readNext(reader);
+  
+        while (slice != null) {
+          boolean skipComplete = skipNext(reader); //skip the artifact because we need just the largest window id.
+          if (!skipComplete) {
+            //artifact not saved so this window was not finished.
+            break;
+          }
+          long offset = reader.getCurrentPointer().getOffset();
+    
+          long window = Longs.fromByteArray(slice.toByteArray());
+          if (ceilingWindow != null && window > ceilingWindow) {
+            break;
+          }
+          endOffset = offset;
+          lastWindow = window;
+          slice = readNext(reader);  //either null or next window
+        }
+
+        if (endOffset != -1) {
+          wal.walEndPointerAfterRecovery = new FileSystemWAL.FileSystemWALPointer(last.part, endOffset);
+          wal.windowWalParts.put(lastWindow, wal.walEndPointerAfterRecovery.getPartNum());
+          return lastWindow;
+        }
+      }
+    }
+    return Stateless.WINDOW_ID;
+  }
+
+  /**
+   * Helper method that catches IOException while reading from wal to check if an entry was saved completely or not.
+   * @param reader wal reader
+   * @return wal entry
+   */
+  protected Slice readNext(FileSystemWAL.FileSystemWALReader reader)
   {
-    Set<Integer> operators = replayState.get(windowId);
-    if (operators == null) {
+    try {
+      return reader.next();
+    } catch (IOException ex) {
+      //exception while reading wal entry which can be because there may have been failure while persisting an
+      //artifact so this window is not a finished window.
+      try {
+        reader.close();
+      } catch (IOException ioe) {
+        //closing the reader quietly.
+      }
       return null;
     }
-    Map<Integer, Object> data = Maps.newHashMap();
-    for (int operatorId : operators) {
-      data.put(operatorId, load(operatorId, windowId));
+  }
+
+  /**
+   * Helper method that catches IOException while skipping an entry from wal to check if an entry was saved
+   * completely or not.
+   * @param reader wal reader
+   * @return true if skip was successful; false otherwise.
+   */
+  private boolean skipNext(FileSystemWAL.FileSystemWALReader reader)
+  {
+    try {
+      reader.skipNext();
+      return true;
+    } catch (IOException ex) {
+      //exception while skipping wal entry which can be because there may have been failure while persisting an
+      //artifact so this window is not a finished window.
+      try {
+        reader.close();
+      } catch (IOException e) {
+        //closing the reader quietly
+      }
+      return false;
     }
-    return data;
   }
 
-  @Override
-  public long[] getWindowIds(int operatorId) throws IOException
+  private void closeReaders() throws IOException
   {
-    Path operatorPath = new Path(appPath, String.valueOf(operatorId));
-    if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) {
-      return null;
+    //close all reader stream and remove read-only wals
+    wal.getReader().close();
+    if (readOnlyWals.size() > 0) {
+      Iterator<Map.Entry<Integer, FSWindowReplayWAL>> walIterator = readOnlyWals.entrySet().iterator();
+      while (walIterator.hasNext()) {
+        Map.Entry<Integer, FSWindowReplayWAL> entry = walIterator.next();
+        entry.getValue().getReader().close();
+
+        int operatorId = entry.getKey();
+        if (deletedOperators == null || !deletedOperators.contains(operatorId)) {
+          //the read only wal can be removed.
+          walIterator.remove();
+        }
+      }
     }
-    return storageAgent.getWindowIds(operatorId);
+  }
+  
+  /**
+   * Save writes 2 entries to the wal: <br/>
+   * <ol>
+   *   <li>window id</li>
+   *   <li>artifact</li>
+   * </ol>
+   * Note: The wal is being used in batch mode so the part file will never be rotated between the 2 entries.<br/>
+   * The wal part file may be rotated after both the entries, when
+   * {@link FileSystemWAL.FileSystemWALWriter#rotateIfNecessary()} is triggered.
+   *
+   * @param object    state
+   * @param windowId  window id
+   * @throws IOException
+   */
+  @Override
+  public void save(Object object, long windowId) throws IOException
+  {
+    closeReaders();
+    FileSystemWAL.FileSystemWALWriter writer = wal.getWriter();
+
+    byte[] windowIdBytes = Longs.toByteArray(windowId);
+    writer.append(new Slice(windowIdBytes));
+    writer.append(toSlice(object));
+    wal.beforeCheckpoint(windowId);
+    wal.windowWalParts.put(windowId, writer.getCurrentPointer().getPartNum());
+    writer.rotateIfNecessary();
   }
 
+  /**
+   * The implementation assumes that artifacts are retrieved in increasing order of window ids. Typically it is used
+   * to replay tuples of successive windows in input operators after failure.
+   * @param windowId      window id
+   * @return saved state for the window id.
+   * @throws IOException
+   */
   @Override
-  public long[] getWindowIds() throws IOException
+  public Object retrieve(long windowId) throws IOException
   {
-    SortedSet<Long> windowIds = replayState.keySet();
-    long[] windowIdsArray = new long[windowIds.size()];
+    return retrieve(wal, windowId);
+  }
 
-    int index = 0;
+  @Override
+  public Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException
+  {
+    if (windowId > largestCompletedWindow) {
+      return null;
+    }
+    Map<Integer, Object> artifacts = Maps.newHashMap();
+    Object artifact = retrieve(wal, windowId);
+    if (artifact != null) {
+      artifacts.put(operatorId, artifact);
+    }
+    if (repartitioned) {
+      for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
+        artifact = retrieve(entry.getValue(), windowId);
+        if (artifact != null) {
+          artifacts.put(entry.getKey(), artifact);
+        }
+      }
+    }
+    return artifacts;
+  }
 
-    for (Long windowId: windowIds) {
-      windowIdsArray[index] = windowId;
-      index++;
+  private Object retrieve(FSWindowReplayWAL wal, long windowId) throws IOException
+  {
+    if (windowId > largestCompletedWindow || wal.walEndPointerAfterRecovery == null) {
+      return null;
     }
 
-    return windowIdsArray;
-  }
+    FileSystemWAL.FileSystemWALReader reader = wal.getReader();
+
+    while (reader.getCurrentPointer() == null ||
+        reader.getCurrentPointer().compareTo(wal.walEndPointerAfterRecovery) < 0) {
+      long currentWindow;
 
+      if (wal.retrievedWindow == null) {
+        wal.retrievedWindow = readNext(reader);
+        Preconditions.checkNotNull(wal.retrievedWindow);
+      }
+      currentWindow = Longs.fromByteArray(wal.retrievedWindow.toByteArray());
+
+      if (windowId == currentWindow) {
+        Slice data = readNext(reader);
+        Preconditions.checkNotNull(data, "data is null");
+
+        wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum());
+        wal.retrievedWindow = readNext(reader); //null or next window
+        
+        return fromSlice(data);
+      } else if (windowId < currentWindow) {
+        //no artifact saved corresponding to that window and artifact is not read.
+        return null;
+      } else {
+        //windowId > current window so we skip the data
+        skipNext(reader);
+        wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum());
+
+        wal.retrievedWindow = readNext(reader); //null or next window
+        if (wal.retrievedWindow == null) {
+          //nothing else to read
+          return null;
+        }
+      }
+    }
+    return null;
+  }
+  
   /**
-   * This deletes all the recovery files of window ids <= windowId.
+   * Deletes artifacts for all windows less than equal to committed window id.<p/>
+   *
+   * {@link FSWindowDataManager} uses {@link FSWindowReplayWAL} to record data which writes to temp part files.
+   * The temp part files are finalized only when they are rotated. So when a window is committed, artifacts for
+   * windows <= committed window may still be in temporary files. These temporary files are needed for Wal recovery so
+   * we do not alter them and we delete a part file completely (opposed to partial deletion) for efficiency.<br/>
+   * Therefore, data of a window gets deleted only when it satisfies all the following criteria:
+   * <ul>
+   *   <li>window <= committed window id</li>
+   *   <li>the part file of the artifact is rotated.</li>
+   *   <li>the part file doesn't contain artifacts for windows greater than the artifact's window to avoid partial
+   *   file deletion.</li>
+   * </ul>
    *
-   * @param operatorId operator id.
-   * @param windowId   the largest window id for which the states will be deleted.
+   * In addition to this we also delete:
+   * <ol>
+   *   <li>Some stray temporary files are also deleted which correspond to completely deleted parts.</li>
+   *   <li>Once the committed window > largest recovery window, we delete the files of partitions that were removed.</li>
+   * </ol>
+   *
+   * @param committedWindowId   window id
    * @throws IOException
    */
   @Override
-  public void deleteUpTo(int operatorId, long windowId) throws IOException
+  public void committed(long committedWindowId) throws IOException
   {
-    //deleting the replay state
-    if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
-      Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
-      while (iterator.hasNext()) {
-        Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
-        long lwindow = windowEntry.getKey();
-        if (lwindow > windowId) {
-          break;
-        }
-        for (Integer loperator : windowEntry.getValue()) {
-
-          if (deletedOperators.contains(loperator)) {
-            storageAgent.delete(loperator, lwindow);
-
-            Path loperatorPath = new Path(appPath, Integer.toString(loperator));
-            if (fs.listStatus(loperatorPath).length == 0) {
-              //The operator was deleted and it has nothing to replay.
-              deletedOperators.remove(loperator);
-              fs.delete(loperatorPath, true);
-            }
-          } else if (loperator == operatorId) {
-            storageAgent.delete(loperator, lwindow);
+    closeReaders();
+    //find the largest window <= committed window id and the part file corresponding to it is finalized.
+    Map.Entry<Long, Integer> largestEntryForDeletion = null;
+
+    Iterator<Map.Entry<Long, Integer>> iterator = wal.windowWalParts.entrySet().iterator();
+
+    while (iterator.hasNext()) {
+      Map.Entry<Long, Integer> entry = iterator.next();
+      //only completely finalized part files are deleted.
+      if (entry.getKey() <= committedWindowId && !wal.tempPartFiles.containsKey(entry.getValue())) {
+        largestEntryForDeletion = entry;
+        iterator.remove();
+      }
+      if (entry.getKey() > committedWindowId) {
+        break;
+      }
+    }
+
+    if (largestEntryForDeletion != null && !wal.windowWalParts.containsValue(
+        largestEntryForDeletion.getValue()) /* no artifacts for higher window present*/) {
+
+      int highestPartToDelete = largestEntryForDeletion.getValue();
+      wal.getWriter().delete(new FileSystemWAL.FileSystemWALPointer(highestPartToDelete + 1, 0));
+
+      //also delete any old stray temp files that correspond to parts < deleteTillPointer.partNum
+      Iterator<Map.Entry<Integer, FSWindowReplayWAL.FileDescriptor>> fileIterator =
+          wal.fileDescriptors.entries().iterator();
+      while (fileIterator.hasNext()) {
+        Map.Entry<Integer, FSWindowReplayWAL.FileDescriptor> entry = fileIterator.next();
+        if (entry.getKey() <= highestPartToDelete && entry.getValue().isTmp) {
+          if (fileContext.util().exists(entry.getValue().filePath)) {
+            fileContext.delete(entry.getValue().filePath, true);
           }
+        } else if (entry.getKey() > highestPartToDelete) {
+          break;
         }
-        iterator.remove();
       }
     }
 
-    if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) {
-      long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId);
-      Arrays.sort(windowsAfterReplay);
-      for (long lwindow : windowsAfterReplay) {
-        if (lwindow <= windowId) {
-          storageAgent.delete(operatorId, lwindow);
+    //delete data of partitions that have been removed
+    if (deletedOperators != null) {
+      Iterator<Integer> operatorIter = deletedOperators.iterator();
+
+      while (operatorIter.hasNext()) {
+        int deletedOperatorId = operatorIter.next();
+        FSWindowReplayWAL wal = readOnlyWals.get(deletedOperatorId);
+        if (committedWindowId > largestCompletedWindow) {
+          Path operatorDir = new Path(fullStatePath + Path.SEPARATOR + deletedOperatorId);
+
+          if (fileContext.util().exists(operatorDir)) {
+            fileContext.delete(operatorDir, true);
+          }
+          wal.teardown();
+          operatorIter.remove();
+          readOnlyWals.remove(deletedOperatorId);
         }
       }
+
+      if (deletedOperators.isEmpty()) {
+        deletedOperators = null;
+      }
     }
   }
 
-  @Override
-  public long getLargestRecoveryWindow()
+  private Slice toSlice(Object object)
+  {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Output output = new Output(baos);
+    kryo.writeClassAndObject(output, object);
+    output.close();
+    byte[] bytes = baos.toByteArray();
+
+    return new Slice(bytes);
+  }
+
+  protected Object fromSlice(Slice slice)
+  {
+    Input input = new Input(slice.buffer, slice.offset, slice.length);
+    Object object = kryo.readClassAndObject(input);
+    input.close();
+    return object;
+  }
+
+  public long getLargestCompletedWindow()
   {
-    return largestRecoveryWindow;
+    return largestCompletedWindow;
   }
 
   @Override
-  public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+  public List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds)
   {
-    Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(),
-        "there has to be one idempotent storage manager");
-    org.apache.apex.malhar.lib.wal.FSWindowDataManager deletedOperatorsManager = null;
+    repartitioned = true;
 
+    KryoCloneUtils<FSWindowDataManager> cloneUtils = KryoCloneUtils.createCloneUtils(this);
+
+    FSWindowDataManager[] windowDataManagers = cloneUtils.getClones(newCount);
     if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) {
-      if (this.deletedOperators == null) {
-        this.deletedOperators = Sets.newHashSet();
-      }
-      this.deletedOperators.addAll(removedOperatorIds);
+      windowDataManagers[0].deletedOperators = removedOperatorIds;
     }
 
-    for (WindowDataManager storageManager : newManagers) {
-
-      org.apache.apex.malhar.lib.wal.FSWindowDataManager lmanager = (org.apache.apex.malhar.lib.wal.FSWindowDataManager)storageManager;
-      lmanager.recoveryPath = this.recoveryPath;
-      lmanager.storageAgent = this.storageAgent;
+    List<WindowDataManager> mangers = new ArrayList<>();
+    mangers.addAll(Arrays.asList(windowDataManagers));
+    return mangers;
+  }
 
-      if (lmanager.deletedOperators != null) {
-        deletedOperatorsManager = lmanager;
-      }
-      //only one physical instance can manage deleted operators so clearing this field for rest of the instances.
-      if (lmanager != deletedOperatorsManager) {
-        lmanager.deletedOperators = null;
-      }
+  @Override
+  public void teardown()
+  {
+    wal.teardown();
+    for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+      wal.teardown();
     }
+  }
 
-    if (removedOperatorIds == null || removedOperatorIds.isEmpty()) {
-      //Nothing to do
-      return;
-    }
-    if (this.deletedOperators != null) {
-
-      /*If some operators were removed then there needs to be a manager which can clean there state when it is not
-      needed.*/
-      if (deletedOperatorsManager == null) {
-        //None of the managers were handling deleted operators data.
-        deletedOperatorsManager = (org.apache.apex.malhar.lib.wal.FSWindowDataManager)newManagers.iterator().next();
-        deletedOperatorsManager.deletedOperators = Sets.newHashSet();
-      }
+  protected void setRelyOnCheckpoints(boolean relyOnCheckpoints)
+  {
+    this.relyOnCheckpoints = relyOnCheckpoints;
+  }
 
-      deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds);
-    }
+  /**
+   * @return wal instance
+   */
+  protected FSWindowReplayWAL getWal()
+  {
+    return wal;
   }
 
-  @Override
-  public void teardown()
+  @VisibleForTesting
+  public Set<Integer> getDeletedOperators()
   {
-    try {
-      fs.close();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    if (deletedOperators == null) {
+      return null;
     }
+    return ImmutableSet.copyOf(deletedOperators);
   }
 
   /**
-   * @return recovery path
+   * @return recovery filePath
    */
-  public String getRecoveryPath()
+  public String getStatePath()
   {
-    return recoveryPath;
+    return statePath;
   }
 
   /**
-   * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative
-   * to the application path; otherwise it is handled as an absolute path.
+   * Sets the state path. If {@link #isStatePathRelativeToAppPath} is true then this filePath is handled
+   * relative
+   * to the application filePath; otherwise it is handled as an absolute filePath.
    *
-   * @param recoveryPath recovery path
+   * @param statePath recovery filePath
    */
-  public void setRecoveryPath(String recoveryPath)
+  public void setStatePath(String statePath)
   {
-    this.recoveryPath = recoveryPath;
+    this.statePath = statePath;
   }
 
   /**
-   * @return true if recovery path is relative to app path; false otherwise.
+   * @return true if state path is relative to app path; false otherwise.
    */
-  public boolean isRecoveryPathRelativeToAppPath()
+  public boolean isStatePathRelativeToAppPath()
   {
-    return isRecoveryPathRelativeToAppPath;
+    return isStatePathRelativeToAppPath;
   }
 
   /**
-   * Specifies whether the recovery path is relative to application path.
+   * Specifies whether the state path is relative to application filePath.
    *
-   * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise.
+   * @param statePathRelativeToAppPath true if state path is relative to application path; false
+   *                                      otherwise.
    */
-  public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath)
+  public void setStatePathRelativeToAppPath(boolean statePathRelativeToAppPath)
   {
-    isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath;
+    isStatePathRelativeToAppPath = statePathRelativeToAppPath;
   }
+
+  private static Logger LOG = LoggerFactory.getLogger(FSWindowDataManager.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
new file mode 100644
index 0000000..326c7a3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.wal;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.TreeMultimap;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A {@link FileSystemWAL} that WindowDataManager uses to save state of every window.
+ */
+public class FSWindowReplayWAL extends FileSystemWAL
+{
+  transient boolean readOnly;
+
+  transient TreeMultimap<Integer, FileDescriptor> fileDescriptors = TreeMultimap.create();
+
+  //all the readers will read to this point while replaying.
+  transient FileSystemWALPointer walEndPointerAfterRecovery;
+  transient Slice retrievedWindow;
+
+  transient TreeMap<Long, Integer>  windowWalParts = new TreeMap<>();
+
+  FSWindowReplayWAL()
+  {
+    super();
+    setInBatchMode(true);
+    setFileSystemWALWriter(new WriterThatFinalizesImmediately(this));
+  }
+
+  FSWindowReplayWAL(boolean readOnly)
+  {
+    this();
+    this.readOnly = readOnly;
+  }
+
+  @Override
+  public void setup()
+  {
+    try {
+      if (getMaxLength() == 0) {
+        setMaxLength(fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize());
+      }
+      if (!readOnly) {
+        getWriter().recover();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("while setup");
+    }
+  }
+  
+  public FileSystemWALPointer getWalEndPointerAfterRecovery()
+  {
+    return walEndPointerAfterRecovery;
+  }
+  
+  /**
+   * Finalizes files just after rotation. Doesn't wait for the window to be committed.
+   */
+  static class WriterThatFinalizesImmediately extends FileSystemWAL.FileSystemWALWriter
+  {
+
+    private WriterThatFinalizesImmediately()
+    {
+      super();
+    }
+
+    protected WriterThatFinalizesImmediately(@NotNull FileSystemWAL fileSystemWal)
+    {
+      super(fileSystemWal);
+    }
+
+    /**
+     * Finalize the file immediately after rotation.
+     * @param partNum        part number
+     * @throws IOException
+     */
+    @Override
+    protected void rotated(int partNum) throws IOException
+    {
+      finalize(partNum);
+    }
+
+    @Override
+    protected void recover() throws IOException
+    {
+      restoreActivePart();
+    }
+  }
+
+  static class FileDescriptor implements Comparable<FileDescriptor>
+  {
+    int part;
+    boolean isTmp;
+    long time;
+    Path filePath;
+
+    static FileDescriptor create(Path filePath)
+    {
+      FileDescriptor descriptor = new FileDescriptor();
+      descriptor.filePath = filePath;
+
+      String name = filePath.getName();
+      String[] parts = name.split("\\.");
+
+      String[] namePart = parts[0].split("_");
+      descriptor.part = Integer.parseInt(namePart[1]);
+
+      if (parts.length == 3) {
+        descriptor.isTmp = true;
+        descriptor.time = Long.parseLong(parts[1]);
+      }
+
+      return descriptor;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof FileDescriptor)) {
+        return false;
+      }
+
+      FileDescriptor that = (FileDescriptor)o;
+
+      if (part != that.part) {
+        return false;
+      }
+      if (isTmp != that.isTmp) {
+        return false;
+      }
+      return time == that.time;
+
+    }
+
+    @Override
+    public int hashCode()
+    {
+      int result = part;
+      result = 31 * result + (isTmp ? 1 : 0);
+      result = 31 * result + (int)(time ^ (time >>> 32));
+      return result;
+    }
+
+    @Override
+    public int compareTo(FileDescriptor o)
+    {
+      if (part < o.part) {
+        return -1;
+      } else if (part > o.part) {
+        return 1;
+      } else {
+        if (isTmp && !o.isTmp) {
+          return -1;
+        } else if (!isTmp && o.isTmp) {
+          return 1;
+        }
+        return Long.compare(time, o.time);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
index f454188..b7d5ba1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
@@ -56,7 +56,7 @@ import com.datatorrent.netlet.util.Slice;
  * <p/>
  * Note:<br/>
  * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
- * problems. Typically the  WAL Reader will only used in recovery.<br/>
+ * problems. Typically the WAL Reader will only used in recovery or replay of finished windows.<br/>
  *
  * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
  * operator's thread.
@@ -73,7 +73,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
   @Min(0)
   private long maxLength;
 
-  private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
+  FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
 
   @NotNull
   private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
@@ -81,21 +81,26 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
   @NotNull
   private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
 
-  //part => tmp file path;
-  private final Map<Integer, String> tempPartFiles = new TreeMap<>();
+  //part => tmp file filePath;
+  final Map<Integer, String> tempPartFiles = new TreeMap<>();
 
   private long lastCheckpointedWindow = Stateless.WINDOW_ID;
 
+  private boolean hardLimitOnMaxLength;
+
+  private boolean inBatchMode;
+
+  transient FileContext fileContext;
+
   @Override
   public void setup()
   {
     try {
-      FileContext fileContext = FileContextUtils.getFileContext(filePath);
+      fileContext = FileContextUtils.getFileContext(filePath);
       if (maxLength == 0) {
         maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
       }
-      fileSystemWALWriter.open(fileContext);
-      fileSystemWALReader.open(fileContext);
+      fileSystemWALWriter.recover();
     } catch (IOException e) {
       throw new RuntimeException("during setup", e);
     }
@@ -158,6 +163,14 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
   {
     return filePath + "_" + partNumber;
   }
+  
+  /**
+   * @return the wal start pointer
+   */
+  public FileSystemWALPointer getWalStartPointer()
+  {
+    return walStartPointer;
+  }
 
   @Override
   public FileSystemWALReader getReader()
@@ -227,6 +240,47 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
     this.maxLength = maxLength;
   }
 
+  /**
+   * @return true if there is a hard limit on max length; false otherwise.
+   */
+  public boolean isHardLimitOnMaxLength()
+  {
+    return hardLimitOnMaxLength;
+  }
+
+  /**
+   * When hard limit on max length is true, then a wal part file will never exceed the the max length.<br/>
+   * Entry is appended to the next part if adding it to the current part exceeds the max length.
+   * <p/>
+   * When hard limit on max length if false, then a wal part file can exceed the max length if the last entry makes the
+   * wal part exceeds the max length. By default this is set to false.
+   *
+   * @param hardLimitOnMaxLength
+   */
+  public void setHardLimitOnMaxLength(boolean hardLimitOnMaxLength)
+  {
+    this.hardLimitOnMaxLength = hardLimitOnMaxLength;
+  }
+
+  /**
+   * @return true if writing in batch mode; false otherwise.
+   */
+  protected boolean isInBatchMode()
+  {
+    return inBatchMode;
+  }
+
+  /**
+   * When in batch mode, a file is rotated only when a batch gets completed. This facilitates writing multiple entries
+   * that will all be written to the same part file.
+   *
+   * @param inBatchMode write in batch mode or not.
+   */
+  protected void setInBatchMode(boolean inBatchMode)
+  {
+    this.inBatchMode = inBatchMode;
+  }
+
   public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
   {
     private final int partNum;
@@ -261,6 +315,11 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
       return offset;
     }
 
+    public FileSystemWALPointer getCopy()
+    {
+      return new FileSystemWALPointer(partNum, offset);
+    }
+
     @Override
     public String toString()
     {
@@ -273,16 +332,15 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
    */
   public static class FileSystemWALReader implements WAL.WALReader<FileSystemWALPointer>
   {
-    private FileSystemWALPointer currentPointer;
+    private transient FileSystemWALPointer currentPointer;
 
     private transient DataInputStream inputStream;
     private transient Path currentOpenPath;
     private transient boolean isOpenPathTmp;
 
     private final FileSystemWAL fileSystemWAL;
-    private transient FileContext fileContext;
 
-    private FileSystemWALReader()
+    protected FileSystemWALReader()
     {
       //for kryo
       fileSystemWAL = null;
@@ -291,16 +349,10 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
     public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
     {
       this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
-      currentPointer = new FileSystemWALPointer(fileSystemWal.walStartPointer.partNum,
-          fileSystemWal.walStartPointer.offset);
     }
 
-    protected void open(@NotNull FileContext fileContext) throws IOException
-    {
-      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
-    }
-
-    protected void close() throws IOException
+    @Override
+    public void close() throws IOException
     {
       if (inputStream != null) {
         inputStream.close();
@@ -350,9 +402,9 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
         isOpenPathTmp = false;
       }
 
-      LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer);
-      if (fileContext.util().exists(pathToReadFrom)) {
-        DataInputStream stream = fileContext.open(pathToReadFrom);
+      LOG.debug("filePath to read {} and pointer {}", pathToReadFrom, walPointer);
+      if (fileSystemWAL.fileContext.util().exists(pathToReadFrom)) {
+        DataInputStream stream = fileSystemWAL.fileContext.open(pathToReadFrom);
         if (walPointer.offset > 0) {
           stream.skip(walPointer.offset);
         }
@@ -365,6 +417,20 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
     @Override
     public Slice next() throws IOException
     {
+      return readOrSkip(false);
+    }
+
+    @Override
+    public void skipNext() throws IOException
+    {
+      readOrSkip(true);
+    }
+
+    private Slice readOrSkip(boolean skip) throws IOException
+    {
+      if (currentPointer == null) {
+        currentPointer = fileSystemWAL.walStartPointer;
+      }
       do {
         if (inputStream == null) {
           inputStream = getInputStream(currentPointer);
@@ -376,21 +442,36 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
           inputStream = getInputStream(currentPointer);
         }
 
-        if (inputStream != null && currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
+        if (inputStream != null && currentPointer.offset <
+            fileSystemWAL.fileContext.getFileStatus(currentOpenPath).getLen()) {
           int len = inputStream.readInt();
           Preconditions.checkState(len >= 0, "negative length");
 
-          byte[] data = new byte[len];
-          inputStream.readFully(data);
+          if (!skip) {
+            byte[] data = new byte[len];
+            inputStream.readFully(data);
+            currentPointer.offset += len + 4;
+            return new Slice(data);
 
-          currentPointer.offset += data.length + 4;
-          return new Slice(data);
+          } else {
+            long actualSkipped = inputStream.skip(len);
+            if (actualSkipped != len) {
+              throw new IOException("unable to skip " + len);
+            }
+            currentPointer.offset += len + 4;
+            return null;
+          }
         }
       } while (nextSegment());
       close();
       return null;
     }
 
+    public FileSystemWALPointer getCurrentPointer()
+    {
+      return currentPointer;
+    }
+
     @Override
     public FileSystemWALPointer getStartPointer()
     {
@@ -410,52 +491,52 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
     private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
 
     private final FileSystemWAL fileSystemWAL;
-    private transient FileContext fileContext;
 
     private int latestFinalizedPart = -1;
     private int lowestDeletedPart = -1;
 
-    private FileSystemWALWriter()
+    protected FileSystemWALWriter()
     {
       //for kryo
       fileSystemWAL = null;
     }
 
-    public FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal)
+    protected FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal)
     {
       this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
     }
 
-    protected void open(@NotNull FileContext fileContext) throws IOException
+    protected void recover() throws IOException
     {
-      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
-      recover();
+      restoreActivePart();
+      deleteStrayTmpFiles();
     }
 
-    private void recover() throws IOException
+    void restoreActivePart() throws IOException
     {
-      LOG.debug("current point", currentPointer);
+      LOG.debug("restore part {}", currentPointer);
       String tmpFilePath = fileSystemWAL.tempPartFiles.get(currentPointer.getPartNum());
       if (tmpFilePath != null) {
 
-        Path tmpPath = new Path(tmpFilePath);
-        if (fileContext.util().exists(tmpPath)) {
-          LOG.debug("tmp path exists {}", tmpPath);
+        Path inputPath = new Path(tmpFilePath);
+        if (fileSystemWAL.fileContext.util().exists(inputPath)) {
+          LOG.debug("input path exists {}", inputPath);
 
+          //temp file output stream
           outputStream = getOutputStream(new FileSystemWALPointer(currentPointer.partNum, 0));
-          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
+          DataInputStream inputStream = fileSystemWAL.fileContext.open(inputPath);
 
-          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
+          IOUtils.copyPartial(inputStream, currentPointer.offset, outputStream);
 
-          outputStream.flush();
-          //remove old tmp
-          inputStreamOldTmp.close();
-          LOG.debug("delete tmp {}", tmpPath);
-          fileContext.delete(tmpPath, true);
+          flush();
+          inputStream.close();
         }
       }
+    }
 
-      //find all valid path names
+    void deleteStrayTmpFiles() throws IOException
+    {
+      //find all valid filePath names
       Set<String> validPathNames = new HashSet<>();
       for (Map.Entry<Integer, String> entry : fileSystemWAL.tempPartFiles.entrySet()) {
         if (entry.getKey() <= currentPointer.partNum) {
@@ -468,22 +549,22 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
       //which aren't accounted by tmp files map
       Path walPath = new Path(fileSystemWAL.filePath);
       Path parentWAL = walPath.getParent();
-      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
-        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
+      if (parentWAL != null && fileSystemWAL.fileContext.util().exists(parentWAL)) {
+        RemoteIterator<FileStatus> remoteIterator = fileSystemWAL.fileContext.listStatus(parentWAL);
         while (remoteIterator.hasNext()) {
           FileStatus status = remoteIterator.next();
           String fileName = status.getPath().getName();
           if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
               !validPathNames.contains(fileName)) {
             LOG.debug("delete stray tmp {}", status.getPath());
-            fileContext.delete(status.getPath(), true);
+            fileSystemWAL.fileContext.delete(status.getPath(), true);
           }
         }
       }
-
     }
 
-    protected void close() throws IOException
+    @Override
+    public void close() throws IOException
     {
       if (outputStream != null) {
         outputStream.close();
@@ -499,23 +580,27 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
         outputStream = getOutputStream(currentPointer);
       }
 
-      int entryLength = entry.length + 4;
+      int entrySize = entry.length + 4;
+
+      if (fileSystemWAL.hardLimitOnMaxLength) {
+        Preconditions.checkArgument(entrySize > fileSystemWAL.maxLength, "entry too big. increase the max length");
+      }
 
       // rotate if needed
-      if (shouldRotate(entryLength)) {
+      if (fileSystemWAL.hardLimitOnMaxLength && shouldRotate(entrySize) && !fileSystemWAL.inBatchMode) {
         rotate(true);
       }
 
       outputStream.writeInt(entry.length);
       outputStream.write(entry.buffer, entry.offset, entry.length);
-      currentPointer.offset += entryLength;
+      currentPointer.offset += entrySize;
 
-      if (currentPointer.offset == fileSystemWAL.maxLength) {
+      if (currentPointer.offset >= fileSystemWAL.maxLength && !fileSystemWAL.inBatchMode) {
         //if the file is completed then we can rotate it. do not have to wait for next entry
         rotate(false);
       }
 
-      return entryLength;
+      return entrySize;
     }
 
     @Override
@@ -543,9 +628,9 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
         if (i <= latestFinalizedPart) {
           //delete a part only if it is finalized.
           Path partPath = new Path(fileSystemWAL.getPartFilePath(i));
-          if (fileContext.util().exists(partPath)) {
+          if (fileSystemWAL.fileContext.util().exists(partPath)) {
             LOG.debug("delete {}", partPath);
-            fileContext.delete(partPath, true);
+            fileSystemWAL.fileContext.delete(partPath, true);
             lastPartDeleted = i;
           } else {
             break;
@@ -560,18 +645,18 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
       if (pointer.partNum <= latestFinalizedPart && pointer.offset > 0) {
         String part = fileSystemWAL.getPartFilePath(pointer.partNum);
         Path inputPartPath = new Path(part);
-        long length = fileContext.getFileStatus(inputPartPath).getLen();
+        long length = fileSystemWAL.fileContext.getFileStatus(inputPartPath).getLen();
 
         LOG.debug("truncate {} from {} length {}", part, pointer.offset, length);
 
         if (length > pointer.offset) {
 
-          String temp = getTmpFilePath(part);
+          String temp = createTmpFilePath(part);
           Path tmpPart = new Path(temp);
 
-          DataInputStream inputStream = fileContext.open(inputPartPath);
-          DataOutputStream outputStream = fileContext.create(tmpPart, EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND),
-              Options.CreateOpts.CreateParent.createParent());
+          DataInputStream inputStream = fileSystemWAL.fileContext.open(inputPartPath);
+          DataOutputStream outputStream = fileSystemWAL.fileContext.create(tmpPart,
+              EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
 
           IOUtils.copyPartial(inputStream, pointer.offset, length - pointer.offset, outputStream);
           inputStream.close();
@@ -582,7 +667,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
             fileSystemWAL.walStartPointer.offset = 0;
           }
 
-          fileContext.rename(tmpPart, inputPartPath, Options.Rename.OVERWRITE);
+          fileSystemWAL.fileContext.rename(tmpPart, inputPartPath, Options.Rename.OVERWRITE);
         }
       }
     }
@@ -590,8 +675,8 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
     protected void flush() throws IOException
     {
       if (outputStream != null) {
-        if (fileContext.getDefaultFileSystem() instanceof LocalFs ||
-            fileContext.getDefaultFileSystem() instanceof RawLocalFs) {
+        if (fileSystemWAL.fileContext.getDefaultFileSystem() instanceof LocalFs ||
+            fileSystemWAL.fileContext.getDefaultFileSystem() instanceof RawLocalFs) {
           //until the stream is closed on the local FS, readers don't see any data.
           close();
         } else {
@@ -607,19 +692,38 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
       return currentPointer.offset + entryLength > fileSystemWAL.maxLength;
     }
 
-    protected void rotate(boolean openNextFile) throws IOException
+    void rotate(boolean openNextFile) throws IOException
     {
       flush();
       close();
-      //all parts up to current part can be finalized.
-      pendingFinalization.put(fileSystemWAL.getLastCheckpointedWindow(), currentPointer.partNum);
 
-      LOG.debug("rotate {} to {}", currentPointer.partNum, currentPointer.partNum + 1);
+      int partNum = currentPointer.partNum;
+      LOG.debug("rotate {} to {}", partNum, currentPointer.partNum + 1);
       currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
       if (openNextFile) {
         //if adding the new entry to the file can cause the current file to exceed the max length then it is rotated.
         outputStream = getOutputStream(currentPointer);
       }
+
+      rotated(partNum);
+    }
+
+    /**
+     * When the wal is used in batch-mode, this method will trigger rotation if the current part file is completed.
+     * @throws IOException
+     */
+    protected void rotateIfNecessary() throws IOException
+    {
+      if (fileSystemWAL.inBatchMode && currentPointer.offset >= fileSystemWAL.maxLength) {
+        //if the file is completed then we can rotate it
+        rotate(false);
+      }
+    }
+
+    protected void rotated(int partNum) throws IOException
+    {
+      //all parts up to current part can be finalized.
+      pendingFinalization.put(fileSystemWAL.getLastCheckpointedWindow(), partNum);
     }
 
     protected void finalizeFiles(long window) throws IOException
@@ -640,14 +744,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
 
           int partToFinalizeTill = entry.getValue();
           for (int i = largestPartAvailable; i <= partToFinalizeTill; i++) {
-            String tmpToFinalize = fileSystemWAL.tempPartFiles.remove(i);
-            Path tmpPath = new Path(tmpToFinalize);
-
-            if (fileContext.util().exists(tmpPath)) {
-              LOG.debug("finalize {} of part {}", tmpToFinalize, i);
-              fileContext.rename(tmpPath, new Path(fileSystemWAL.getPartFilePath(i)), Options.Rename.OVERWRITE);
-              latestFinalizedPart = i;
-            }
+            finalize(i);
           }
           largestPartAvailable = partToFinalizeTill + 1;
         }
@@ -659,37 +756,60 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
       }
     }
 
+    protected void finalize(int partNum) throws IOException
+    {
+      String tmpToFinalize = fileSystemWAL.tempPartFiles.remove(partNum);
+      Path tmpPath = new Path(tmpToFinalize);
+      if (fileSystemWAL.fileContext.util().exists(tmpPath)) {
+        LOG.debug("finalize {} of part {}", tmpPath, partNum);
+        fileSystemWAL.fileContext.rename(tmpPath, new Path(fileSystemWAL.getPartFilePath(partNum)),
+            Options.Rename.OVERWRITE);
+        latestFinalizedPart = partNum;
+      }
+    }
+
     private DataOutputStream getOutputStream(FileSystemWALPointer pointer) throws IOException
     {
       Preconditions.checkArgument(outputStream == null, "output stream is not null");
 
-      if (pointer.offset > 0 && (fileContext.getDefaultFileSystem() instanceof LocalFs ||
-          fileContext.getDefaultFileSystem() instanceof RawLocalFs)) {
-        //on local file system the stream is closed instead of flush so we open it again in append mode if the
-        //offset > 0.
-        return fileContext.create(new Path(fileSystemWAL.tempPartFiles.get(pointer.partNum)),
+      if (pointer.offset > 0 && (fileSystemWAL.fileContext.getDefaultFileSystem() instanceof LocalFs ||
+          fileSystemWAL.fileContext.getDefaultFileSystem() instanceof RawLocalFs)) {
+        //On local file system the stream is always closed and never flushed so we open it again in append mode if the
+        //offset > 0. This block is entered only when appending to wal while writing on local fs.
+        return fileSystemWAL.fileContext.create(new Path(fileSystemWAL.tempPartFiles.get(pointer.partNum)),
             EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), Options.CreateOpts.CreateParent.createParent());
       }
 
       String partFile = fileSystemWAL.getPartFilePath(pointer.partNum);
-      String tmpFilePath = getTmpFilePath(partFile);
+      String tmpFilePath = createTmpFilePath(partFile);
       fileSystemWAL.tempPartFiles.put(pointer.partNum, tmpFilePath);
 
       Preconditions.checkArgument(pointer.offset == 0, "offset > 0");
       LOG.debug("open {} => {}", pointer.partNum, tmpFilePath);
-      outputStream = fileContext.create(new Path(tmpFilePath),
+      outputStream = fileSystemWAL.fileContext.create(new Path(tmpFilePath),
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
       return outputStream;
     }
 
+    //visible to WindowDataManager
+    FileSystemWALPointer getCurrentPointer()
+    {
+      return currentPointer;
+    }
+
+    //visible to WindowDataManager
+    void setCurrentPointer(@NotNull FileSystemWALPointer pointer)
+    {
+      this.currentPointer = Preconditions.checkNotNull(pointer, "pointer");
+    }
   }
 
-  private static String getTmpFilePath(String filePath)
+  private static String createTmpFilePath(String filePath)
   {
     return filePath + '.' + System.currentTimeMillis() + TMP_EXTENSION;
   }
 
-  private static final String TMP_EXTENSION = ".tmp";
+  static final String TMP_EXTENSION = ".tmp";
 
   private static final Logger LOG = LoggerFactory.getLogger(FileSystemWAL.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java
index 45432d5..da21a6d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java
@@ -51,7 +51,7 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    * Provides iterator like interface to read entries from the WAL.
    * @param <P> type of Pointer in the WAL
    */
-  interface WALReader<P>
+  interface WALReader<P> extends AutoCloseable
   {
     /**
      * Seek to middle of the WAL. This is used primarily during recovery,
@@ -66,6 +66,11 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
     Slice next() throws IOException;
 
     /**
+     * Skips the next entry in the WAL.
+     */
+    void skipNext() throws IOException;
+
+    /**
      * Returns the start pointer from which data is available to read.<br/>
      * WAL Writer supports purging of aged data so the start pointer will change over time.
      *
@@ -78,7 +83,7 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    * Provide method to write entries to the WAL.
    * @param <P> type of Pointer in the WAL
    */
-  interface WALWriter<P>
+  interface WALWriter<P> extends AutoCloseable
   {
     /**
      * Write an entry to the WAL

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
index a1917a6..2622a0d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
@@ -19,37 +19,61 @@
 package org.apache.apex.malhar.lib.wal;
 
 import java.io.IOException;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context;
-import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
 
 /**
- * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window.
- * An idempotent agent cannot make any guarantees about the tuples emitted in the application window which fails.
+ * WindowDataManager manages the state of an operator every application window. It can be used to replay tuples in
+ * the input operator after re-deployment for a window which was not check-pointed but processing was completed before
+ * failure.<br/>
+ *
+ * However, it cannot make any guarantees about the tuples emitted in the application window during which the operator
+ * failed.<br/>
  *
  * The order of tuples is guaranteed for ordered input sources.
  *
- * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow
+ * <b>Important:</b> In order for an WindowDataManager to function correctly it cannot allow
  * checkpoints to occur within an application window and checkpoints must be aligned with
  * application window boundaries.
  *
  * @since 2.0.0
  */
-public interface WindowDataManager extends StorageAgent, Component<Context.OperatorContext>
+public interface WindowDataManager extends Component<Context.OperatorContext>
 {
   /**
-   * Gets the largest window for which there is recovery data.
+   * Save the state for a window id.
+   * @param object    state
+   * @param windowId  window id
+   * @throws IOException
+   */
+  void save(Object object, long windowId) throws IOException;
+
+  /**
+   * Gets the object saved for the provided window id. <br/>
+   * Typically it is used to replay tuples of successive windows in input operators after failure.
+   *
+   * @param windowId window id
+   * @return saved state for the window id.
+   * @throws IOException
+   */
+  Object retrieve(long windowId) throws IOException;
+
+  /**
+   * Gets the largest window which was completed.
    * @return Returns the window id
    */
-  long getLargestRecoveryWindow();
+  long getLargestCompletedWindow();
 
   /**
+   * Fetches the state saved for a window id for all the partitions.
+   * <p/>
    * When an operator can partition itself dynamically then there is no guarantee that an input state which was being
    * handled by one instance previously will be handled by the same instance after partitioning. <br/>
    * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no
@@ -58,39 +82,27 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
    * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the
    * operators for the window being replayed and fix it's state.
    *
-   * @param windowId window id.
-   * @return mapping of operator id to the corresponding state
+   * @param windowId window id
+   * @return saved state per operator partitions for the given window.
    * @throws IOException
    */
-  Map<Integer, Object> load(long windowId) throws IOException;
+  Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException;
 
   /**
-   * Delete the artifacts of the operator for windows <= windowId.
+   * Delete the artifacts for windows <= windowId.
    *
-   * @param operatorId operator id
    * @param windowId   window id
    * @throws IOException
    */
-  void deleteUpTo(int operatorId, long windowId) throws IOException;
+  void committed(long windowId) throws IOException;
 
   /**
-   * This informs the idempotent storage manager that operator is partitioned so that it can set properties and
-   * distribute state.
+   * Creates new window data managers during repartitioning.
    *
-   * @param newManagers        all the new idempotent storage managers.
+   * @param newCount count of new window data managers.
    * @param removedOperatorIds set of operator ids which were removed after partitioning.
    */
-  void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds);
-
-  /**
-   * Returns an array of windowIds for which data was stored by atleast one partition. The array
-   * of winodwIds is sorted.
-   *
-   * @return An array of windowIds for which data was stored by atleast one partition. The array
-   * of winodwIds is sorted.
-   * @throws IOException
-   */
-  long[] getWindowIds() throws IOException;
+  List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds);
 
   /**
    * This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators
@@ -98,21 +110,25 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
    */
   class NoopWindowDataManager implements WindowDataManager
   {
-    @Override
-    public long getLargestRecoveryWindow()
+    public long getLargestCompletedWindow()
     {
       return Stateless.WINDOW_ID;
     }
 
     @Override
-    public Map<Integer, Object> load(long windowId) throws IOException
+    public Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException
     {
       return null;
     }
 
     @Override
-    public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+    public List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds)
     {
+      List<WindowDataManager> managers = new ArrayList<>();
+      for (int i = 0; i < newCount; i++) {
+        managers.add(new NoopWindowDataManager());
+      }
+      return managers;
     }
 
     @Override
@@ -126,36 +142,19 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
     }
 
     @Override
-    public void save(Object object, int operatorId, long windowId) throws IOException
+    public void save(Object object, long windowId) throws IOException
     {
     }
 
     @Override
-    public Object load(int operatorId, long windowId) throws IOException
+    public Object retrieve(long windowId) throws IOException
     {
       return null;
     }
 
     @Override
-    public void delete(int operatorId, long windowId) throws IOException
-    {
-    }
-
-    @Override
-    public void deleteUpTo(int operatorId, long windowId) throws IOException
-    {
-    }
-
-    @Override
-    public long[] getWindowIds(int operatorId) throws IOException
-    {
-      return new long[0];
-    }
-
-    @Override
-    public long[] getWindowIds() throws IOException
+    public void committed(long windowId) throws IOException
     {
-      return new long[0];
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index 2f3f356..0f9a7c9 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -178,8 +178,8 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
   public void testRecovery() throws IOException
   {
     int operatorId = 1;
-    when(windowDataManagerMock.getLargestRecoveryWindow()).thenReturn(1L);
-    when(windowDataManagerMock.load(operatorId, 1)).thenReturn(new MutablePair<Integer, Integer>(0, 4));
+    when(windowDataManagerMock.getLargestCompletedWindow()).thenReturn(1L);
+    when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<Integer, Integer>(0, 4));
 
     insertEvents(10, true, 0);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 7990049..e1f23d1 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -617,7 +616,7 @@ public class AbstractFileInputOperatorTest
 
     LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     FSWindowDataManager manager = new FSWindowDataManager();
-    manager.setRecoveryPath(testMeta.dir + "/recovery");
+    manager.setStatePath(testMeta.dir + "/recovery");
 
     oper.setWindowDataManager(manager);
 
@@ -666,7 +665,7 @@ public class AbstractFileInputOperatorTest
 
     LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     FSWindowDataManager manager = new FSWindowDataManager();
-    manager.setRecoveryPath(testMeta.dir + "/recovery");
+    manager.setStatePath(testMeta.dir + "/recovery");
 
     oper.setWindowDataManager(manager);
 
@@ -709,7 +708,7 @@ public class AbstractFileInputOperatorTest
 
     LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     FSWindowDataManager manager = new FSWindowDataManager();
-    manager.setRecoveryPath(testMeta.dir + "/recovery");
+    manager.setStatePath(testMeta.dir + "/recovery");
     oper.setEmitBatchSize(5);
 
     oper.setWindowDataManager(manager);
@@ -772,7 +771,7 @@ public class AbstractFileInputOperatorTest
     LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
 
     FSWindowDataManager manager = new FSWindowDataManager();
-    manager.setRecoveryPath(testMeta.dir + "/recovery");
+    manager.setStatePath(testMeta.dir + "/recovery");
 
     oper.setWindowDataManager(manager);
 
@@ -817,12 +816,12 @@ public class AbstractFileInputOperatorTest
   }
 
   @Test
-  public void testIdempotentStorageManagerPartitioning() throws Exception
+  public void testWindowDataManagerPartitioning() throws Exception
   {
     LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
-    oper.setWindowDataManager(new TestStorageManager());
+    oper.setWindowDataManager(new FSWindowDataManager());
     oper.operatorId = 7;
 
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
@@ -838,15 +837,15 @@ public class AbstractFileInputOperatorTest
     Assert.assertEquals(2, newPartitions.size());
     Assert.assertEquals(1, oper.getCurrentPartitions());
 
-    List<TestStorageManager> storageManagers = Lists.newLinkedList();
+    List<FSWindowDataManager> storageManagers = Lists.newLinkedList();
     for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
-      storageManagers.add((TestStorageManager)p.getPartitionedInstance().getWindowDataManager());
+      storageManagers.add((FSWindowDataManager)p.getPartitionedInstance().getWindowDataManager());
     }
     Assert.assertEquals("count of storage managers", 2, storageManagers.size());
 
     int countOfDeleteManagers = 0;
-    TestStorageManager deleteManager = null;
-    for (TestStorageManager storageManager : storageManagers) {
+    FSWindowDataManager deleteManager = null;
+    for (FSWindowDataManager storageManager : storageManagers) {
       if (storageManager.getDeletedOperators() != null) {
         countOfDeleteManagers++;
         deleteManager = storageManager;
@@ -858,17 +857,6 @@ public class AbstractFileInputOperatorTest
     Assert.assertEquals("deleted operators", Sets.newHashSet(7), deleteManager.getDeletedOperators());
   }
 
-  private static class TestStorageManager extends FSWindowDataManager
-  {
-    Set<Integer> getDeletedOperators()
-    {
-      if (deletedOperators != null) {
-        return ImmutableSet.copyOf(deletedOperators);
-      }
-      return null;
-    }
-  }
-
   /** scanner to extract partition id from start of the filename */
   static class MyScanner extends AbstractFileInputOperator.DirectoryScanner
   {