You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/25 14:18:25 UTC

[3/4] incubator-apex-malhar git commit: APEXMALHAR-1897 added managed state

APEXMALHAR-1897 added managed state


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a8fbcac6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a8fbcac6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a8fbcac6

Branch: refs/heads/master
Commit: a8fbcac6236e4130cef1e83830e944c4788bbca4
Parents: 5373a3c
Author: Chandni Singh <cs...@apache.org>
Authored: Sun Dec 13 03:13:08 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Mar 25 00:04:04 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/fileaccess/FileAccess.java  |   3 +
 .../lib/fileaccess/FileAccessFSImpl.java        |  11 +-
 .../apex/malhar/lib/state/BucketedState.java    |  72 +++
 .../lib/state/TimeSlicedBucketedState.java      | 104 ++++
 .../state/managed/AbstractManagedStateImpl.java | 583 +++++++++++++++++++
 .../apex/malhar/lib/state/managed/Bucket.java   | 525 +++++++++++++++++
 .../lib/state/managed/BucketsFileSystem.java    | 566 ++++++++++++++++++
 .../managed/IncrementalCheckpointManager.java   | 213 +++++++
 .../malhar/lib/state/managed/ManagedState.java  |  32 +
 .../state/managed/ManagedStateComponent.java    |  36 ++
 .../lib/state/managed/ManagedStateContext.java  |  38 ++
 .../lib/state/managed/ManagedStateImpl.java     | 103 ++++
 .../lib/state/managed/ManagedTimeStateImpl.java | 103 ++++
 .../managed/ManagedTimeUnifiedStateImpl.java    | 213 +++++++
 .../malhar/lib/state/managed/StateTracker.java  | 194 ++++++
 .../lib/state/managed/TimeBucketAssigner.java   | 242 ++++++++
 .../malhar/lib/state/managed/package-info.java  |  22 +
 .../apex/malhar/lib/state/package-info.java     |  22 +
 .../state/managed/BucketsFileSystemTest.java    | 166 ++++++
 .../lib/state/managed/DefaultBucketTest.java    | 203 +++++++
 .../IncrementalCheckpointManagerTest.java       | 196 +++++++
 .../lib/state/managed/ManagedStateImplTest.java | 182 ++++++
 .../state/managed/ManagedStateTestUtils.java    | 141 +++++
 .../state/managed/ManagedTimeStateImplTest.java | 151 +++++
 .../ManagedTimeUnifiedStateImplTest.java        | 149 +++++
 .../state/managed/MockManagedStateContext.java  |  91 +++
 .../lib/state/managed/StateTrackerTest.java     | 174 ++++++
 .../state/managed/TimeBucketAssignerTest.java   | 123 ++++
 28 files changed, 4654 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
index d4c7810..f8dd0be 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
@@ -52,8 +52,11 @@ public interface FileAccess extends Closeable
    * @throws IOException
    */
   void rename(long bucketKey, String oldName, String newName) throws IOException;
+
   void delete(long bucketKey, String fileName) throws IOException;
 
+  void deleteBucket(long bucketKey) throws IOException;
+
   long getFileSize(long bucketKey, String s) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
index 74ab238..a9cfe00 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
@@ -135,10 +135,13 @@ public abstract class FileAccessFSImpl implements FileAccess
   public RemoteIterator<LocatedFileStatus> listFiles(long bucketKey) throws IOException
   {
     Path bucketPath = getBucketPath(bucketKey);
-    if (!fs.exists(bucketPath)) {
-      return null;
-    }
-    return fs.listFiles(bucketPath, true);
+    return fs.exists(bucketPath) ? fs.listFiles(bucketPath, true) : null;
+  }
+
+  @Override
+  public void deleteBucket(long bucketKey) throws IOException
+  {
+    fs.delete(getBucketPath(bucketKey), true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
new file mode 100644
index 0000000..a270eb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A state where keys are grouped in buckets.
+ */
+public interface BucketedState
+{
+  /**
+   * An expired value. In some implementations where bucketId is time related then the event can be old and
+   * the get methods- getSync & getAsync return this fixed slice instance.<br/>
+   * In the usages, comparisons with EXPIRED should be made using <code>==</code> instead of <code>equals</code>.
+   */
+  Slice EXPIRED = new Slice(null, -1, -1);
+
+  /**
+   * Sets the value of the key in bucket identified by bucketId.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param key      key (not null)
+   * @param value    value (not null)
+   */
+  void put(long bucketId, @NotNull Slice key, @NotNull Slice value);
+
+  /**
+   * Returns the value of the key in a bucket identified by bucketId. Fetching a key can be expensive if the key
+   * is not in memory and is present on disk. This fetches the key synchronously. <br/>
+   * {@link #getAsync(long, Slice)} is recommended for efficiently reading the value of a key.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param key     key (not null)
+   *
+   * @return value of the key if found; null if the key is not found;
+   * {@link #EXPIRED} if the bucketId is time based and very old.
+   */
+  Slice getSync(long bucketId, @NotNull Slice key);
+
+  /**
+   * Returns the future using which the value is obtained.<br/>
+   *
+   * @param key key (not null)
+   *
+   * @return value of the key if found; null if the key is not found;
+   * {@link #EXPIRED} if the bucketId is time based and very old.
+   */
+  Future<Slice> getAsync(long bucketId, @NotNull Slice key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
new file mode 100644
index 0000000..55b92a3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A type of bucketed state where a bucket's data is further divided into time buckets. This requires
+ * time per key to figure out which time bucket a particular key belongs to.
+ * <p/>
+ * The time here is mainly used for purging of aged key/value pair.
+ */
+public interface TimeSlicedBucketedState
+{
+  /**
+   * Sets the value of a key in the bucket identified by bucketId. Time is used to derive which time bucket (within
+   * the main bucket) a key belongs to.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param time    time associated with the key.
+   * @param key     key   (not null)
+   * @param value   value (not null)
+   */
+  void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value);
+
+  /**
+   * Returns the value of the key in the bucket identified by bucketId.<br/>
+   * If the value of the key is not present in the bucket cache then this scans all the time bucket files on disk from
+   * the latest to the oldest.
+   *
+   * It retrieves the value synchronously that can be expensive.<br/>
+   * {@link #getAsync(long, Slice)} is recommended for efficient reading the value of a key.
+   *
+   *
+   * @param bucketId identifier of the bucket
+   * @param key key (not null)
+   *
+   * @return value of the key if found; null if the key is not found;
+   */
+  Slice getSync(long bucketId, @NotNull Slice key);
+
+
+  /**
+   * Returns the value of key in the bucket identified by bucketId.<br/>
+   * If the value of the key is not present in the bucket cache then this will use the time to derive the time
+   * bucket and just search for the key in a particular time bucket file.<br/>
+   *
+   * It retrieves the value synchronously which can be expensive.<br/>
+   * {@link #getAsync(long, long, Slice)} is recommended for efficiently reading the value of a key.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param time  time for deriving the time bucket.
+   * @param key   key (not null)
+   *
+   * @return value of the key if found; null if the key is not found; {@link BucketedState#EXPIRED} if the time is old.
+   */
+  Slice getSync(long bucketId, long time, @NotNull Slice key);
+
+  /**
+   * Returns the future using which the value is obtained.<br/>
+   * If the value of the key is not present in the bucket cache then this searches for it in all the time buckets on
+   * disk.<br/>
+   * Time-buckets are looked-up in order from newest to oldest.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param key      key (not null)
+   *
+   * @return value of the key if found; null if the key is not found;
+   */
+  Future<Slice> getAsync(long bucketId, @NotNull Slice key);
+
+  /**
+   * Returns the future using which the value is obtained.<br/>
+   * If the value of the key is not present in the bucket cache then this will use the time to derive the time
+   * bucket and just search for the key in a particular time bucket file.<br/>
+   *
+   * @param bucketId  identifier of the bucket.
+   * @param time     time associated with the key.
+   * @param key      key  (not null)
+   *
+   * @return value of the key if found; null if the key is not found; {@link BucketedState#EXPIRED} if time is very old.
+   */
+  Future<Slice> getAsync(long bucketId, long time, @NotNull Slice key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
new file mode 100644
index 0000000..11db44d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of managed state.<br/>
+ *
+ * The important sub-components here are:
+ * <ol>
+ *   <li>
+ *     {@link #checkpointManager}: writes incremental checkpoints in window files and transfers data from window
+ *     files to bucket files.
+ *   </li>
+ *   <li>
+ *     {@link #bucketsFileSystem}: manages writing/reading from all the buckets. A bucket on disk is further sub-divided
+ *     into time-buckets. This abstracts out updating time-buckets and meta files and reading from them.
+ *   </li>
+ *   <li>
+ *     {@link #timeBucketAssigner}: assigns time-buckets to keys and manages the time boundaries.
+ *   </li>
+ *   <li>
+ *     {@link #stateTracker}: tracks the size of data in memory and requests buckets to free memory when enough memory
+ *     is not available.
+ *   </li>
+ *   <li>
+ *     {@link #fileAccess}: plug-able file system abstraction.
+ *   </li>
+ * </ol>
+ * <p/>
+ * <b>Differences between different concrete implementations of {@link AbstractManagedStateImpl}</b>
+ * <table>
+ *   <tr>
+ *     <td></td>
+ *     <td>{@link ManagedStateImpl}</td>
+ *     <td>{@link ManagedTimeStateImpl}</td>
+ *     <td>{@link ManagedTimeUnifiedStateImpl}</td>
+ *   </tr>
+ *   <tr>
+ *     <td>Main buckets</td>
+ *     <td>identified by unique adhoc long ids that the user provides with the key.</td>
+ *     <td>same as ManagedStateImpl.</td>
+ *     <td>user doesn't provide bucket ids and instead just provides time. Time is used to derive the time buckets
+ *     and these are the main buckets.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>Data on disk: data in buckets is persisted on disk with each bucket data further divided into
+ *     time-buckets, i.e., {base_path}/{bucketId}/{time-bucket id}</td>
+ *     <td>time-bucket is computed using the system time corresponding to the application window.</td>
+ *     <td>time-bucket is derived from the user provided time.</td>
+ *     <td>time-bucket is derived from the user provided time.
+ *     In this implementation operator id is used to isolate data of different partitions on disk, i.e.,
+ *     {base_path}/{operatorId}/{time-bucket id}</td>
+ *   </tr>
+ *   <tr>
+ *     <td>Bucket partitioning</td>
+ *     <td>bucket belongs to just one partition. Multiple partitions cannot write to the same bucket.</td>
+ *     <td>same as ManagedStateImpl.</td>
+ *     <td>multiple partitions can be working with the same time-bucket since time-bucket is derived from time.
+ *     This works because on disk each partition's data is segregated by the operator id.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>Dynamic partitioning</td>
+ *     <td>can support dynamic partitioning by pre-allocating buckets.</td>
+ *     <td>same as ManagedStateImpl.</td>
+ *     <td>will not be able to support dynamic partitioning efficiently.</td>
+ *   </tr>
+ * </table>
+ *
+ */
+public abstract class AbstractManagedStateImpl
+    implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext,
+    TimeBucketAssigner.PurgeListener
+{
+  private long maxMemorySize;
+
+  protected int numBuckets;
+
+  @NotNull
+  private FileAccess fileAccess = new TFileImpl.DTFileImpl();
+  @NotNull
+  protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+
+  protected Bucket[] buckets;
+
+  @Min(1)
+  private int numReaders = 1;
+  @NotNull
+  protected transient ExecutorService readerService;
+
+  @NotNull
+  protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();
+
+  @NotNull
+  protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
+
+  protected transient OperatorContext operatorContext;
+
+  @NotNull
+  protected Comparator<Slice> keyComparator = new SliceComparator();
+
+  protected final transient AtomicReference<Throwable> throwable = new AtomicReference<>();
+
+  @NotNull
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration checkStateSizeInterval = Duration.millis(
+      DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue);
+
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration durationPreventingFreeingSpace;
+
+  private transient StateTracker stateTracker = new StateTracker();
+
+  //accessible to StateTracker
+  final transient Object commitLock = new Object();
+
+  protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId =
+      Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create());
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    operatorContext = context;
+    fileAccess.init();
+
+    timeBucketAssigner.setPurgeListener(this);
+
+    //setup all the managed state components
+    timeBucketAssigner.setup(this);
+    checkpointManager.setup(this);
+    bucketsFileSystem.setup(this);
+
+    if (buckets == null) {
+      //create buckets array only once at start when it is not created.
+      numBuckets = getNumBuckets();
+      buckets = new Bucket[numBuckets];
+    }
+    for (Bucket bucket : buckets) {
+      if (bucket != null) {
+        bucket.setup(this);
+      }
+    }
+
+    stateTracker.setup(this);
+    long activationWindow = context.getValue(OperatorContext.ACTIVATION_WINDOW_ID);
+
+    if (activationWindow != Stateless.WINDOW_ID) {
+      //delete all the wal files with windows > activationWindow.
+      //All the wal files with windows <= activationWindow are loaded and kept separately as recovered data.
+      try {
+        for (long recoveredWindow : checkpointManager.getWindowIds(operatorContext.getId())) {
+          if (recoveredWindow <= activationWindow) {
+            @SuppressWarnings("unchecked")
+            Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
+                checkpointManager.load(operatorContext.getId(), recoveredWindow);
+            if (recoveredData != null && !recoveredData.isEmpty()) {
+              for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) {
+                int bucketIdx = prepareBucket(entry.getKey());
+                buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue());
+              }
+            }
+            checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow,
+                true /*skipWritingToWindowFile*/);
+
+          } else {
+            checkpointManager.delete(operatorContext.getId(), recoveredWindow);
+          }
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("recovering", e);
+      }
+    }
+
+    readerService = Executors.newFixedThreadPool(numReaders, new NameableThreadFactory("managedStateReaders"));
+  }
+
+  /**
+   * Gets the number of buckets which is required during setup to create the array of buckets.<br/>
+   * {@link ManagedTimeStateImpl} provides num of buckets which is injected using a property.<br/>
+   * {@link ManagedTimeUnifiedStateImpl} provides num of buckets which are calculated based on time settings.
+   *
+   * @return number of buckets.
+   */
+  public abstract int getNumBuckets();
+
+  public void beginWindow(long windowId)
+  {
+    if (throwable.get() != null) {
+      Throwables.propagate(throwable.get());
+    }
+    timeBucketAssigner.beginWindow(windowId);
+  }
+
+
+  /**
+   * Prepares the bucket and returns its index.
+   * @param bucketId bucket key
+   * @return bucket index
+   */
+  protected int prepareBucket(long bucketId)
+  {
+    stateTracker.bucketAccessed(bucketId);
+    int bucketIdx = getBucketIdx(bucketId);
+
+    Bucket bucket = buckets[bucketIdx];
+    if (bucket == null) {
+      //bucket is not in memory
+      bucket = newBucket(bucketId);
+      bucket.setup(this);
+      buckets[bucketIdx] = bucket;
+    } else if (bucket.getBucketId() != bucketId) {
+      handleBucketConflict(bucketIdx, bucketId);
+    }
+    return bucketIdx;
+  }
+
+  protected void putInBucket(long bucketId, long timeBucket, @NotNull Slice key, @NotNull Slice value)
+  {
+    Preconditions.checkNotNull(key, "key");
+    Preconditions.checkNotNull(value, "value");
+    if (timeBucket != -1) {
+      //time bucket is invalid data is not stored
+      int bucketIdx = prepareBucket(bucketId);
+      buckets[bucketIdx].put(key, timeBucket, value);
+    }
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  protected Slice getValueFromBucketSync(long bucketId, long timeBucket, @NotNull Slice key)
+  {
+    Preconditions.checkNotNull(key, "key");
+    int bucketIdx = prepareBucket(bucketId);
+    Bucket bucket = buckets[bucketIdx];
+    synchronized (bucket) {
+      return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
+    }
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  protected Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket, @NotNull Slice key)
+  {
+    Preconditions.checkNotNull(key, "key");
+    int bucketIdx = prepareBucket(bucketId);
+    Bucket bucket = buckets[bucketIdx];
+    synchronized (bucket) {
+      Slice cachedVal = bucket.get(key, timeBucket, Bucket.ReadSource.MEMORY);
+      if (cachedVal != null) {
+        return Futures.immediateFuture(cachedVal);
+      }
+      ValueFetchTask valueFetchTask = new ValueFetchTask(bucket, key, timeBucket, this);
+      tasksPerBucketId.put(bucket.getBucketId(), valueFetchTask);
+      return readerService.submit(valueFetchTask);
+    }
+  }
+
+  protected void handleBucketConflict(int bucketIdx, long newBucketId)
+  {
+    throw new IllegalArgumentException("bucket conflict " + buckets[bucketIdx].getBucketId() + " " + newBucketId);
+  }
+
+  protected int getBucketIdx(long bucketId)
+  {
+    return (int)(bucketId % numBuckets);
+  }
+
+  Bucket getBucket(long bucketId)
+  {
+    return buckets[getBucketIdx(bucketId)];
+  }
+
+  protected Bucket newBucket(long bucketId)
+  {
+    return new Bucket.DefaultBucket(bucketId);
+  }
+
+  public void endWindow()
+  {
+    timeBucketAssigner.endWindow();
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    Map<Long, Map<Slice, Bucket.BucketedValue>> flashData = Maps.newHashMap();
+
+    for (Bucket bucket : buckets) {
+      if (bucket != null) {
+        synchronized (bucket) {
+          Map<Slice, Bucket.BucketedValue> flashDataForBucket = bucket.checkpoint(windowId);
+          if (!flashDataForBucket.isEmpty()) {
+            flashData.put(bucket.getBucketId(), flashDataForBucket);
+          }
+        }
+      }
+    }
+    if (!flashData.isEmpty()) {
+      try {
+        checkpointManager.save(flashData, operatorContext.getId(), windowId, false);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  @Override
+  public void committed(long windowId)
+  {
+    synchronized (commitLock) {
+      try {
+        for (Bucket bucket : buckets) {
+          if (bucket != null) {
+            synchronized (bucket) {
+              bucket.committed(windowId);
+            }
+          }
+        }
+        checkpointManager.committed(operatorContext.getId(), windowId);
+      } catch (IOException | InterruptedException e) {
+        throw new RuntimeException("committing " + windowId, e);
+      }
+    }
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  @Override
+  public void teardown()
+  {
+    checkpointManager.teardown();
+    bucketsFileSystem.teardown();
+    timeBucketAssigner.teardown();
+    readerService.shutdownNow();
+    for (Bucket bucket : buckets) {
+      if (bucket != null) {
+        synchronized (bucket) {
+          bucket.teardown();
+        }
+      }
+    }
+    stateTracker.teardown();
+  }
+
+  @Override
+  public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+  {
+    checkpointManager.setLatestExpiredTimeBucket(timeBucket);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext()
+  {
+    return operatorContext;
+  }
+
+  @Override
+  public void setMaxMemorySize(long bytes)
+  {
+    maxMemorySize = bytes;
+  }
+
+  /**
+   *
+   * @return the optimal size of the cache that triggers eviction of committed data from memory.
+   */
+  public long getMaxMemorySize()
+  {
+    return maxMemorySize;
+  }
+
+  /**
+   * Sets the {@link FileAccess} implementation.
+   * @param fileAccess specific implementation of FileAccess.
+   */
+  public void setFileAccess(@NotNull FileAccess fileAccess)
+  {
+    this.fileAccess = Preconditions.checkNotNull(fileAccess);
+  }
+
+  @Override
+  public FileAccess getFileAccess()
+  {
+    return fileAccess;
+  }
+
+  /**
+   * Sets the time bucket assigner. This can be used for plugging any custom time bucket assigner.
+   *
+   * @param timeBucketAssigner a {@link TimeBucketAssigner}
+   */
+  public void setTimeBucketAssigner(@NotNull TimeBucketAssigner timeBucketAssigner)
+  {
+    this.timeBucketAssigner = Preconditions.checkNotNull(timeBucketAssigner);
+  }
+
+  @Override
+  public TimeBucketAssigner getTimeBucketAssigner()
+  {
+    return timeBucketAssigner;
+  }
+
+  @Override
+  public Comparator<Slice> getKeyComparator()
+  {
+    return keyComparator;
+  }
+
+  /**
+   * Sets the key comparator. The keys on the disk in time bucket files are sorted. This sets the comparator for the
+   * key.
+   * @param keyComparator key comparator
+   */
+  public void setKeyComparator(@NotNull Comparator<Slice> keyComparator)
+  {
+    this.keyComparator = Preconditions.checkNotNull(keyComparator);
+  }
+
+  public BucketsFileSystem getBucketsFileSystem()
+  {
+    return bucketsFileSystem;
+  }
+
+  /**
+   * @return number of worker threads in the reader service.
+   */
+  public int getNumReaders()
+  {
+    return numReaders;
+  }
+
+  /**
+   * Sets the number of worker threads in the reader service which is responsible for asynchronously fetching
+   * values of the keys. This should not exceed number of buckets.
+   *
+   * @param numReaders number of worker threads in the reader service.
+   */
+  public void setNumReaders(int numReaders)
+  {
+    this.numReaders = numReaders;
+  }
+
+  /**
+   * @return regular interval at which the size of state is checked.
+   */
+  public Duration getCheckStateSizeInterval()
+  {
+    return checkStateSizeInterval;
+  }
+
+  /**
+   * Sets the interval at which the size of state is regularly checked.
+
+   * @param checkStateSizeInterval regular interval at which the size of state is checked.
+   */
+  public void setCheckStateSizeInterval(@NotNull Duration checkStateSizeInterval)
+  {
+    this.checkStateSizeInterval = Preconditions.checkNotNull(checkStateSizeInterval);
+  }
+
+  /**
+   * @return duration which prevents a bucket being evicted.
+   */
+  public Duration getDurationPreventingFreeingSpace()
+  {
+    return durationPreventingFreeingSpace;
+  }
+
+  /**
+   * Sets the duration which prevents buckets to free space. For example if this is set to an hour, then only
+   * buckets which were not accessed in last one hour will be triggered to free spaces.
+   *
+   * @param durationPreventingFreeingSpace time duration
+   */
+  public void setDurationPreventingFreeingSpace(Duration durationPreventingFreeingSpace)
+  {
+    this.durationPreventingFreeingSpace = durationPreventingFreeingSpace;
+  }
+
+  static class ValueFetchTask implements Callable<Slice>
+  {
+    private final Bucket bucket;
+    private final long timeBucketId;
+    private final Slice key;
+    private final AbstractManagedStateImpl managedState;
+
+    ValueFetchTask(@NotNull Bucket bucket, @NotNull Slice key, long timeBucketId, AbstractManagedStateImpl managedState)
+    {
+      this.bucket = Preconditions.checkNotNull(bucket);
+      this.timeBucketId = timeBucketId;
+      this.key = Preconditions.checkNotNull(key);
+      this.managedState = Preconditions.checkNotNull(managedState);
+    }
+
+    @Override
+    public Slice call() throws Exception
+    {
+      try {
+        synchronized (bucket) {
+          //a particular bucket should only be handled by one thread at any point of time. Handling of bucket here
+          //involves creating readers for the time buckets and de-serializing key/value from a reader.
+          Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL);
+          managedState.tasksPerBucketId.remove(bucket.getBucketId(), this);
+          return value;
+        }
+      } catch (Throwable t) {
+        managedState.throwable.set(t);
+        throw Throwables.propagate(t);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void setStateTracker(@NotNull StateTracker stateTracker)
+  {
+    this.stateTracker = Preconditions.checkNotNull(stateTracker, "state tracker");
+  }
+
+  @VisibleForTesting
+  void setBucketsFileSystem(@NotNull BucketsFileSystem bucketsFileSystem)
+  {
+    this.bucketsFileSystem = Preconditions.checkNotNull(bucketsFileSystem, "buckets file system");
+  }
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateImpl.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
new file mode 100644
index 0000000..b2c1618
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A bucket that groups events.
+ */
+public interface Bucket extends ManagedStateComponent
+{
+  /**
+   * @return bucket id
+   */
+  long getBucketId();
+
+  /**
+   *
+   * @return size of bucket in memory.
+   */
+  long getSizeInBytes();
+
+  /**
+   * Get value of a key.
+   *
+   * @param key        key.
+   * @param timeBucket time bucket of the key if known; -1 otherwise.
+   * @param source     source to read from
+   * @return value of the key.
+   */
+  Slice get(Slice key, long timeBucket, ReadSource source);
+
+  /**
+   * Set value of a key.
+   *
+   * @param key        key.
+   * @param timeBucket timeBucket of the key.
+   * @param value      value of the key.
+   */
+  void put(Slice key, long timeBucket, Slice value);
+
+  /**
+   * Triggers the bucket to checkpoint. Returns the non checkpointed data so far.
+   *
+   * @return non checkpointed data.
+   */
+  Map<Slice, BucketedValue> checkpoint(long windowId);
+
+  /**
+   * Triggers the bucket to commit data till provided window id.
+   *
+   * @param windowId window id
+   */
+  void committed(long windowId);
+
+  /**
+   * Triggers bucket to free memory which is already persisted in bucket data files.
+   *
+   * @return amount of memory freed in bytes.
+   * @throws IOException
+   */
+  long freeMemory() throws IOException;
+
+  /**
+   * Allows the bucket to process/cache data which is recovered (from window files) after failure.
+   *
+   * @param windowId recovery window
+   * @param recoveredData recovered data
+   */
+  void recoveredData(long windowId, Map<Slice, Bucket.BucketedValue> recoveredData);
+
+  enum ReadSource
+  {
+    MEMORY,      //state in memory in key/value form
+    READERS,     //these are streams in which the key will be searched and serialized.
+    ALL          //both the above states.
+  }
+
+  class BucketedValue
+  {
+    private long timeBucket;
+    private Slice value;
+
+    protected BucketedValue()
+    {
+    }
+
+    protected BucketedValue(long timeBucket, Slice value)
+    {
+      this.timeBucket = timeBucket;
+      this.value = value;
+    }
+
+    protected long getTimeBucket()
+    {
+      return timeBucket;
+    }
+
+    protected void setTimeBucket(long timeBucket)
+    {
+      this.timeBucket = timeBucket;
+    }
+
+    public Slice getValue()
+    {
+      return value;
+    }
+
+    public void setValue(Slice value)
+    {
+      this.value = value;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof BucketedValue)) {
+        return false;
+      }
+
+      BucketedValue that = (BucketedValue)o;
+
+      return timeBucket == that.timeBucket && value.equals(that.value);
+
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(timeBucket, value);
+    }
+  }
+
+  /**
+   * Default bucket.<br/>
+   * Not thread-safe.
+   */
+  class DefaultBucket implements Bucket
+  {
+    private final long bucketId;
+
+    //Key -> Ordered values
+    private transient Map<Slice, BucketedValue> flash = Maps.newConcurrentMap();
+
+    //Data persisted in write ahead logs. window -> bucket
+    private final transient ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> checkpointedData =
+        new ConcurrentSkipListMap<>();
+
+    //Data persisted in bucket data files
+    private final transient Map<Slice, BucketedValue> committedData = Maps.newConcurrentMap();
+
+    //Data serialized/deserialized from bucket data files: key -> value from latest time bucket on file
+    private final transient Map<Slice, BucketedValue> fileCache = Maps.newConcurrentMap();
+
+    //TimeBucket -> FileReaders
+    private final transient Map<Long, FileAccess.FileReader> readers = Maps.newTreeMap();
+
+    protected transient ManagedStateContext managedStateContext;
+
+    private AtomicLong sizeInBytes = new AtomicLong(0);
+
+    private final transient Slice dummyGetKey = new Slice(null, 0, 0);
+
+    private transient TreeSet<BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
+
+    private DefaultBucket()
+    {
+      //for kryo
+      bucketId = -1;
+    }
+
+    protected DefaultBucket(long bucketId)
+    {
+      this.bucketId = bucketId;
+    }
+
+    @Override
+    public void setup(@NotNull ManagedStateContext managedStateContext)
+    {
+      this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+    }
+
+    @Override
+    public long getBucketId()
+    {
+      return bucketId;
+    }
+
+    @Override
+    public long getSizeInBytes()
+    {
+      return sizeInBytes.longValue();
+    }
+
+    private Slice getFromMemory(Slice key)
+    {
+      //search the cache for key
+      BucketedValue bucketedValue = flash.get(key);
+      if (bucketedValue != null) {
+        return bucketedValue.getValue();
+      }
+
+      for (Long window : checkpointedData.descendingKeySet()) {
+        //traverse the checkpointed data in reverse order
+        bucketedValue = checkpointedData.get(window).get(key);
+        if (bucketedValue != null) {
+          return bucketedValue.getValue();
+        }
+      }
+
+      bucketedValue = committedData.get(key);
+      if (bucketedValue != null) {
+        return bucketedValue.getValue();
+      }
+
+      bucketedValue = fileCache.get(key);
+      if (bucketedValue != null) {
+        return bucketedValue.getValue();
+      }
+
+      return null;
+    }
+
+    private Slice getFromReaders(Slice key, long timeBucket)
+    {
+      try {
+        if (cachedBucketMetas == null) {
+          cachedBucketMetas = managedStateContext.getBucketsFileSystem().getAllTimeBuckets(bucketId);
+        }
+        if (timeBucket != -1) {
+          Slice valSlice = getValueFromTimeBucketReader(key, timeBucket);
+          if (valSlice != null) {
+            if (timeBucket == cachedBucketMetas.first().getTimeBucketId()) {
+              //if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache.
+              BucketedValue bucketedValue = new BucketedValue(timeBucket, valSlice);
+              fileCache.put(key, bucketedValue);
+            }
+          }
+          return valSlice;
+        } else {
+          //search all the time buckets
+          for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) {
+
+            if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) {
+              //keys in the time bucket files are sorted so if the first key in the file is greater than the key being
+              //searched, the key will not be present in that file.
+              Slice valSlice = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId());
+              if (valSlice != null) {
+                BucketedValue bucketedValue = new BucketedValue(immutableTimeBucketMeta.getTimeBucketId(), valSlice);
+                //Only when the key is read from the latest time bucket on the file, the key/value is put in the file
+                // cache.
+                fileCache.put(key, bucketedValue);
+                return valSlice;
+              }
+            }
+          }
+          return null;
+        }
+
+      } catch (IOException e) {
+        throw new RuntimeException("get time-buckets " + bucketId, e);
+      }
+    }
+
+    @Override
+    public Slice get(Slice key, long timeBucket, ReadSource readSource)
+    {
+      switch (readSource) {
+        case MEMORY:
+          return getFromMemory(key);
+        case READERS:
+          return getFromReaders(key, timeBucket);
+        case ALL:
+        default:
+          Slice value = getFromMemory(key);
+          if (value != null) {
+            return value;
+          }
+          return getFromReaders(key, timeBucket);
+      }
+    }
+
+    /**
+     * Returns the value for the key from a time-bucket reader
+     * @param key        key
+     * @param timeBucket time bucket
+     * @return value if key is found in the time bucket; false otherwise
+     */
+    private Slice getValueFromTimeBucketReader(Slice key, long timeBucket)
+    {
+      FileAccess.FileReader fileReader = readers.get(timeBucket);
+      if (fileReader != null) {
+        return readValue(fileReader, key, timeBucket);
+      }
+      //file reader is not loaded and is null
+      try {
+        if (loadFileReader(timeBucket)) {
+          return readValue(readers.get(timeBucket), key, timeBucket);
+        }
+        return null;
+      } catch (IOException e) {
+        throw new RuntimeException("while loading " + bucketId + ", " + timeBucket, e);
+      }
+    }
+
+    private Slice readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket)
+    {
+      Slice valSlice = new Slice(null, 0, 0);
+      try {
+        if (fileReader.seek(key)) {
+          fileReader.next(dummyGetKey, valSlice);
+          return valSlice;
+        } else {
+          return null;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("reading " + bucketId + ", " + timeBucket, e);
+      }
+    }
+
+    private boolean loadFileReader(long timeBucketId) throws IOException
+    {
+      BucketsFileSystem.TimeBucketMeta tbm = managedStateContext.getBucketsFileSystem()
+          .getTimeBucketMeta(bucketId, timeBucketId);
+
+      if (tbm != null) {
+        FileAccess.FileReader reader = managedStateContext.getBucketsFileSystem().getReader(bucketId,
+            BucketsFileSystem.getFileName(timeBucketId));
+        readers.put(timeBucketId, reader);
+        sizeInBytes.getAndAdd(tbm.getSizeInBytes());
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void put(Slice key, long timeBucket, Slice value)
+    {
+      BucketedValue bucketedValue = flash.get(key);
+      if (bucketedValue == null) {
+        bucketedValue = new BucketedValue();
+        flash.put(key, bucketedValue);
+        sizeInBytes.getAndAdd(key.length);
+        sizeInBytes.getAndAdd(Long.SIZE);
+      }
+      if (timeBucket > bucketedValue.getTimeBucket()) {
+
+        int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
+        sizeInBytes.getAndAdd(inc);
+        bucketedValue.setTimeBucket(timeBucket);
+        bucketedValue.setValue(value);
+      }
+    }
+
+    @Override
+    public long freeMemory() throws IOException
+    {
+      LOG.debug("free space {}", bucketId);
+      long memoryFreed = 0;
+      for (Map.Entry<Slice, BucketedValue> entry : committedData.entrySet()) {
+        memoryFreed += entry.getKey().length + entry.getValue().getValue().length;
+      }
+      committedData.clear();
+      fileCache.clear();
+      if (cachedBucketMetas != null) {
+
+        for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas) {
+          FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
+          if (reader != null) {
+            memoryFreed += tbm.getSizeInBytes();
+            reader.close();
+          }
+        }
+
+      }
+      sizeInBytes.getAndAdd(-memoryFreed);
+      return memoryFreed;
+    }
+
+    @Override
+    public Map<Slice, BucketedValue> checkpoint(long windowId)
+    {
+      try {
+        //transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
+        return flash;
+      } finally {
+        checkpointedData.put(windowId, flash);
+        flash = Maps.newHashMap();
+      }
+    }
+
+    @Override
+    public void committed(long committedWindowId)
+    {
+      Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> stateIterator = checkpointedData.entrySet().iterator();
+
+      while (stateIterator.hasNext()) {
+        Map.Entry<Long, Map<Slice, BucketedValue>> entry = stateIterator.next();
+
+        long savedWindow = entry.getKey();
+        if (savedWindow <= committedWindowId) {
+          Map<Slice, BucketedValue> bucketData = entry.getValue();
+
+          //removing any stale values from the file cache
+          for (Slice key : bucketData.keySet()) {
+            fileCache.remove(key);
+          }
+
+          for (BucketedValue bucketedValue : bucketData.values()) {
+            FileAccess.FileReader reader = readers.get(bucketedValue.getTimeBucket());
+            if (reader != null) {
+              //closing the file reader for the time bucket if it is in memory because the time-bucket is modified
+              //so it will be re-written by BucketsDataManager
+              try {
+                LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket());
+                reader.close();
+              } catch (IOException e) {
+                throw new RuntimeException("closing reader " + bucketId + ", " + bucketedValue.getTimeBucket(), e);
+              }
+              readers.remove(bucketedValue.getTimeBucket());
+            }
+            if (readers.isEmpty()) {
+              break;
+            }
+          }
+          committedData.putAll(bucketData);
+          stateIterator.remove();
+        } else {
+          break;
+        }
+      }
+
+      cachedBucketMetas = null;
+    }
+
+    @Override
+    public void recoveredData(long recoveredWindow, Map<Slice, BucketedValue> data)
+    {
+      checkpointedData.put(recoveredWindow, data);
+    }
+
+    @Override
+    public void teardown()
+    {
+      Set<Long> failureBuckets = Sets.newHashSet();
+      for (Map.Entry<Long, FileAccess.FileReader> entry : readers.entrySet()) {
+        try {
+          LOG.debug("closing reader {} {}", bucketId, entry.getKey());
+          entry.getValue().close();
+        } catch (IOException e) {
+          //will try to close all readers
+          failureBuckets.add(entry.getKey());
+        }
+      }
+      if (!failureBuckets.isEmpty()) {
+        StringBuilder builder = new StringBuilder("teardown of ");
+        builder.append(bucketId).append(" < ");
+        for (Long timeBucket : failureBuckets) {
+          builder.append(timeBucket);
+        }
+        builder.append(">");
+        throw new RuntimeException(builder.toString());
+      }
+    }
+
+    @VisibleForTesting
+    Map<Long, FileAccess.FileReader> getReaders()
+    {
+      return readers;
+    }
+
+    @VisibleForTesting
+    Map<Slice, BucketedValue> getCommittedData()
+    {
+      return committedData;
+    }
+
+    @VisibleForTesting
+    ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> getCheckpointedData()
+    {
+      return checkpointedData;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
new file mode 100644
index 0000000..8304fb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeBasedTable;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Persists bucket data on disk and maintains meta information about the buckets.
+ * <p/>
+ *
+ * Each bucket has a meta-data file and the format of that is :<br/>
+ * <ol>
+ * <li>version of the meta data (int)</li>
+ * <li>total number of time-buckets (int)</li>
+ * <li>For each time bucket
+ * <ol>
+ * <li>time bucket key (long)</li>
+ * <li>size of data (sum of bytes) (long)</li>
+ * <li>last transferred window id (long)</li>
+ * <li>length of the first key in the time-bucket file (int)</li>
+ * <li>first key in the time-bucket file (byte[])</li>
+ * </ol>
+ * </li>
+ * </ol>
+ * <p/>
+ * Meta data information is updated by {@link IncrementalCheckpointManager}. Any updates are restricted to the package.
+ */
+public class BucketsFileSystem implements ManagedStateComponent
+{
+  static final String META_FILE_NAME = "_META";
+  private static final int META_FILE_VERSION = 1;
+
+  private final transient TreeBasedTable<Long, Long, MutableTimeBucketMeta> timeBucketsMeta = TreeBasedTable.create();
+
+  //Check-pointed set of all buckets this instance has written to.
+  protected final Set<Long> bucketNamesOnFS = new ConcurrentSkipListSet<>();
+
+  protected transient ManagedStateContext managedStateContext;
+
+  @Override
+  public void setup(@NotNull ManagedStateContext managedStateContext)
+  {
+    this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+  }
+
+  protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().getWriter(bucketId, fileName);
+  }
+
+  protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().getReader(bucketId, fileName);
+  }
+
+  protected void rename(long bucketId, String fromName, String toName) throws IOException
+  {
+    managedStateContext.getFileAccess().rename(bucketId, fromName, toName);
+  }
+
+  protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().getOutputStream(bucketId, fileName);
+  }
+
+  protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().getInputStream(bucketId, fileName);
+  }
+
+  protected boolean exists(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().exists(bucketId, fileName);
+  }
+
+  protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException
+  {
+    return managedStateContext.getFileAccess().listFiles(bucketId);
+  }
+
+  protected void delete(long bucketId, String fileName) throws IOException
+  {
+    managedStateContext.getFileAccess().delete(bucketId, fileName);
+  }
+
+  protected void deleteBucket(long bucketId) throws IOException
+  {
+    managedStateContext.getFileAccess().deleteBucket(bucketId);
+  }
+
+  /**
+   * Saves data to a bucket. The data consists of key/values of all time-buckets of a particular bucket.
+   *
+   * @param windowId        window id
+   * @param bucketId        bucket id
+   * @param data            data of all time-buckets
+   * @throws IOException
+   */
+  protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) throws IOException
+  {
+    Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys = TreeBasedTable.create(Ordering.<Long>natural(),
+        managedStateContext.getKeyComparator());
+
+    for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) {
+      long timeBucketId = entry.getValue().getTimeBucket();
+      timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue());
+    }
+
+    for (long timeBucket : timeBucketedKeys.rowKeySet()) {
+      BucketsFileSystem.MutableTimeBucketMeta tbm = getOrCreateTimeBucketMeta(bucketId, timeBucket);
+      addBucketName(bucketId);
+
+      long dataSize = 0;
+      Slice firstKey = null;
+
+      FileAccess.FileWriter fileWriter;
+      String tmpFileName = getTmpFileName();
+      if (tbm.getLastTransferredWindowId() == -1) {
+        //A new time bucket so we append all the key/values to the new file
+        fileWriter = getWriter(bucketId, tmpFileName);
+
+        for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet()) {
+          Slice key = entry.getKey();
+          Slice value = entry.getValue().getValue();
+
+          dataSize += key.length;
+          dataSize += value.length;
+
+          fileWriter.append(key.toByteArray(), value.toByteArray());
+          if (firstKey == null) {
+            firstKey = key;
+          }
+        }
+      } else {
+        //the time bucket existed so we need to read the file and then re-write it
+        TreeMap<Slice, Slice> fileData = new TreeMap<>(managedStateContext.getKeyComparator());
+        FileAccess.FileReader fileReader = getReader(bucketId, getFileName(timeBucket));
+        fileReader.readFully(fileData);
+        fileReader.close();
+
+        for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet()) {
+          fileData.put(entry.getKey(), entry.getValue().getValue());
+        }
+
+        fileWriter = getWriter(bucketId, tmpFileName);
+        for (Map.Entry<Slice, Slice> entry : fileData.entrySet()) {
+          Slice key = entry.getKey();
+          Slice value = entry.getValue();
+
+          dataSize += key.length;
+          dataSize += value.length;
+
+          fileWriter.append(key.toByteArray(), value.toByteArray());
+          if (firstKey == null) {
+            firstKey = key;
+          }
+        }
+      }
+      fileWriter.close();
+      rename(bucketId, tmpFileName, getFileName(timeBucket));
+      tbm.updateTimeBucketMeta(windowId, dataSize, firstKey);
+    }
+
+    updateBucketMetaFile(bucketId);
+  }
+
+  /**
+   * Retrieves the time bucket meta of a particular time-bucket. If the time bucket doesn't exist then a new one
+   * is created.
+   *
+   * @param bucketId     bucket id
+   * @param timeBucketId time bucket id
+   * @return time bucket meta of the time bucket
+   * @throws IOException
+   */
+  @NotNull
+  MutableTimeBucketMeta getOrCreateTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
+  {
+    synchronized (timeBucketsMeta) {
+      MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
+      if (tbm == null) {
+        tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
+        timeBucketsMeta.put(bucketId, timeBucketId, tbm);
+      }
+      return tbm;
+    }
+  }
+
+  protected void addBucketName(long bucketId)
+  {
+    bucketNamesOnFS.add(bucketId);
+  }
+
+  /**
+   * Returns the time bucket meta of a particular time-bucket which is immutable.
+   *
+   * @param bucketId     bucket id
+   * @param timeBucketId time bucket id
+   * @return immutable time bucket meta
+   * @throws IOException
+   */
+  @Nullable
+  public TimeBucketMeta getTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
+  {
+    synchronized (timeBucketsMeta) {
+      MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
+      if (tbm != null) {
+        return tbm.getImmutableTimeBucketMeta();
+      }
+      return null;
+    }
+  }
+
+  /**
+   * This should be entered only after acquiring the lock on {@link #timeBucketsMeta}
+   *
+   * @param bucketId      bucket id
+   * @param timeBucketId  time bucket id
+   * @return Mutable time bucket meta for a bucket id and time bucket id.
+   * @throws IOException
+   */
+  private MutableTimeBucketMeta timeBucketMetaHelper(long bucketId, long timeBucketId) throws IOException
+  {
+    MutableTimeBucketMeta tbm = timeBucketsMeta.get(bucketId, timeBucketId);
+    if (tbm != null) {
+      return tbm;
+    }
+    if (exists(bucketId, META_FILE_NAME)) {
+      try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
+        //Load meta info of all the time buckets of the bucket identified by bucketId.
+        loadBucketMetaFile(bucketId, dis);
+      }
+    } else {
+      return null;
+    }
+    return timeBucketsMeta.get(bucketId, timeBucketId);
+  }
+
+  /**
+   * Returns the meta information of all the time buckets in the bucket in descending order - latest to oldest.
+   *
+   * @param bucketId bucket id
+   * @return all the time buckets in order - latest to oldest
+   */
+  public TreeSet<TimeBucketMeta> getAllTimeBuckets(long bucketId) throws IOException
+  {
+    synchronized (timeBucketsMeta) {
+      TreeSet<TimeBucketMeta> immutableTimeBucketMetas = Sets.newTreeSet(
+          Collections.<TimeBucketMeta>reverseOrder());
+
+      if (timeBucketsMeta.containsRow(bucketId)) {
+        for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
+          immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+        }
+        return immutableTimeBucketMetas;
+      }
+      if (exists(bucketId, META_FILE_NAME)) {
+        try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
+          //Load meta info of all the time buckets of the bucket identified by bucket id
+          loadBucketMetaFile(bucketId, dis);
+          for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
+            immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+          }
+          return immutableTimeBucketMetas;
+        }
+      }
+      return immutableTimeBucketMetas;
+    }
+  }
+
+  /**
+   * Loads the bucket meta-file. This should be entered only after acquiring the lock on {@link #timeBucketsMeta}.
+   *
+   * @param bucketId bucket id
+   * @param dis      data input stream
+   * @throws IOException
+   */
+  private void loadBucketMetaFile(long bucketId, DataInputStream dis) throws IOException
+  {
+    int metaDataVersion = dis.readInt();
+
+    if (metaDataVersion == META_FILE_VERSION) {
+      int numberOfEntries = dis.readInt();
+
+      for (int i = 0; i < numberOfEntries; i++) {
+        long timeBucketId = dis.readLong();
+        long dataSize = dis.readLong();
+        long lastTransferredWindow = dis.readLong();
+
+        MutableTimeBucketMeta tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
+
+        int sizeOfFirstKey = dis.readInt();
+        byte[] firstKeyBytes = new byte[sizeOfFirstKey];
+        dis.readFully(firstKeyBytes, 0, firstKeyBytes.length);
+        tbm.updateTimeBucketMeta(lastTransferredWindow, dataSize, new Slice(firstKeyBytes));
+
+        timeBucketsMeta.put(bucketId, timeBucketId, tbm);
+      }
+    }
+  }
+
+  /**
+   * Saves the updated bucket meta on disk.
+   *
+   * @param bucketId bucket id
+   * @throws IOException
+   */
+  void updateBucketMetaFile(long bucketId) throws IOException
+  {
+    Map<Long, MutableTimeBucketMeta> timeBuckets;
+    synchronized (timeBucketsMeta) {
+      timeBuckets = timeBucketsMeta.row(bucketId);
+
+      Preconditions.checkNotNull(timeBuckets, "timeBuckets");
+      String tmpFileName = getTmpFileName();
+
+      try (DataOutputStream dos = getOutputStream(bucketId, tmpFileName)) {
+        dos.writeInt(META_FILE_VERSION);
+        dos.writeInt(timeBuckets.size());
+        for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBuckets.entrySet()) {
+          MutableTimeBucketMeta tbm = entry.getValue();
+          dos.writeLong(tbm.getTimeBucketId());
+          dos.writeLong(tbm.getSizeInBytes());
+          dos.writeLong(tbm.getLastTransferredWindowId());
+          dos.writeInt(tbm.getFirstKey().length);
+          dos.write(tbm.getFirstKey().toByteArray());
+        }
+
+      }
+      rename(bucketId, tmpFileName, META_FILE_NAME);
+    }
+  }
+
+  protected void deleteTimeBucketsLessThanEqualTo(long latestExpiredTimeBucket) throws IOException
+  {
+    LOG.debug("delete files before {}", latestExpiredTimeBucket);
+
+    for (long bucketName : bucketNamesOnFS) {
+      RemoteIterator<LocatedFileStatus> timeBucketsIterator = listFiles(bucketName);
+      boolean emptyBucket = true;
+      while (timeBucketsIterator.hasNext()) {
+        LocatedFileStatus timeBucketStatus = timeBucketsIterator.next();
+
+        String timeBucketStr = timeBucketStatus.getPath().getName();
+        if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) {
+          //ignoring meta and tmp files
+          continue;
+        }
+        long timeBucket = Long.parseLong(timeBucketStr);
+
+        if (timeBucket <= latestExpiredTimeBucket) {
+          LOG.debug("deleting bucket {} time-bucket {}", timeBucket);
+          invalidateTimeBucket(bucketName, timeBucket);
+          delete(bucketName, timeBucketStatus.getPath().getName());
+        } else {
+          emptyBucket = false;
+        }
+      }
+      if (emptyBucket) {
+        LOG.debug("deleting bucket {}", bucketName);
+        deleteBucket(bucketName);
+      }
+    }
+  }
+
+  void invalidateTimeBucket(long bucketId, long timeBucketId) throws IOException
+  {
+    synchronized (timeBucketsMeta) {
+      timeBucketsMeta.remove(bucketId, timeBucketId);
+    }
+    updateBucketMetaFile(bucketId);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  /**
+   * This serves the readers - {@link Bucket.DefaultBucket}.
+   * It is immutable and accessible outside the package unlike {@link MutableTimeBucketMeta}.
+   */
+  public static class TimeBucketMeta implements Comparable<TimeBucketMeta>
+  {
+    private final long bucketId;
+    private final long timeBucketId;
+    private long lastTransferredWindowId = -1;
+    private long sizeInBytes;
+    private Slice firstKey;
+
+    private TimeBucketMeta()
+    {
+      //for kryo
+      bucketId = -1;
+      timeBucketId = -1;
+    }
+
+    private TimeBucketMeta(long bucketId, long timeBucketId)
+    {
+      this.bucketId = bucketId;
+      this.timeBucketId = timeBucketId;
+    }
+
+    public long getLastTransferredWindowId()
+    {
+      return lastTransferredWindowId;
+    }
+
+    public long getSizeInBytes()
+    {
+      return this.sizeInBytes;
+    }
+
+    public long getBucketId()
+    {
+      return bucketId;
+    }
+
+    public long getTimeBucketId()
+    {
+      return timeBucketId;
+    }
+
+    public Slice getFirstKey()
+    {
+      return firstKey;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof TimeBucketMeta)) {
+        return false;
+      }
+
+      TimeBucketMeta that = (TimeBucketMeta)o;
+
+      return bucketId == that.bucketId && timeBucketId == that.timeBucketId;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(bucketId, timeBucketId);
+    }
+
+    @Override
+    public int compareTo(@NotNull TimeBucketMeta o)
+    {
+      if (bucketId < o.bucketId) {
+        return -1;
+      }
+      if (bucketId > o.bucketId) {
+        return 1;
+      }
+      if (timeBucketId < o.timeBucketId) {
+        return -1;
+      }
+      if (timeBucketId > o.timeBucketId) {
+        return 1;
+      }
+      return 0;
+    }
+  }
+
+  /**
+   * Represents time bucket meta information which can be changed.
+   * The updates to an instance and read/creation of {@link #immutableTimeBucketMeta} belonging to it are synchronized
+   * as different threads are updating and reading from it.<br/>
+   *
+   * The instance is updated when data from window files are transferred to bucket files and
+   * {@link Bucket.DefaultBucket} reads the immutable time bucket meta.
+   */
+  static class MutableTimeBucketMeta extends TimeBucketMeta
+  {
+    private transient TimeBucketMeta immutableTimeBucketMeta;
+
+    private volatile boolean changed;
+
+    public MutableTimeBucketMeta(long bucketId, long timeBucketId)
+    {
+      super(bucketId, timeBucketId);
+    }
+
+    synchronized void updateTimeBucketMeta(long lastTransferredWindow, long bytes, @NotNull Slice firstKey)
+    {
+      changed = true;
+      super.lastTransferredWindowId = lastTransferredWindow;
+      super.sizeInBytes = bytes;
+      super.firstKey = Preconditions.checkNotNull(firstKey, "first key");
+    }
+
+    synchronized TimeBucketMeta getImmutableTimeBucketMeta()
+    {
+      if (immutableTimeBucketMeta == null || changed) {
+
+        immutableTimeBucketMeta = new TimeBucketMeta(getBucketId(), getTimeBucketId());
+        immutableTimeBucketMeta.lastTransferredWindowId = getLastTransferredWindowId();
+        immutableTimeBucketMeta.sizeInBytes = getSizeInBytes();
+        immutableTimeBucketMeta.firstKey = getFirstKey();
+        changed = false;
+      }
+      return immutableTimeBucketMeta;
+    }
+
+  }
+
+  protected static String getFileName(long timeBucketId)
+  {
+    return Long.toString(timeBucketId);
+  }
+
+  protected static String getTmpFileName()
+  {
+    return System.currentTimeMillis() + ".tmp";
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(BucketsFileSystem.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
new file mode 100644
index 0000000..a372163
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Queues;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Manages state which is written to files by windows. The state from the window files are then transferred to bucket
+ * data files. This class listens to time expiry events issued by {@link TimeBucketAssigner}.
+ *
+ * This component is also responsible for purging old time buckets.
+ */
+public class IncrementalCheckpointManager extends WindowDataManager.FSWindowDataManager
+    implements ManagedStateComponent
+{
+  private static final String WAL_RELATIVE_PATH = "managed_state";
+
+  //windowId => (bucketId => data)
+  private final transient Map<Long, Map<Long, Map<Slice, Bucket.BucketedValue>>> savedWindows = new
+      ConcurrentSkipListMap<>();
+
+  private transient ExecutorService writerService;
+  private transient volatile boolean transfer;
+
+  private final transient LinkedBlockingQueue<Long> windowsToTransfer = Queues.newLinkedBlockingQueue();
+  private final transient AtomicReference<Throwable> throwable = new AtomicReference<>();
+
+  protected transient ManagedStateContext managedStateContext;
+
+  private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1);
+
+  private transient int waitMillis;
+
+
+  public IncrementalCheckpointManager()
+  {
+    super();
+    setRecoveryPath(WAL_RELATIVE_PATH);
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public void setup(@NotNull final ManagedStateContext managedStateContext)
+  {
+    this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+    waitMillis = managedStateContext.getOperatorContext().getValue(Context.OperatorContext.SPIN_MILLIS);
+    super.setup(managedStateContext.getOperatorContext());
+
+    writerService = Executors.newSingleThreadExecutor(new NameableThreadFactory("managed-state-writer"));
+    transfer = true;
+    writerService.submit(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        while (transfer) {
+          transferWindowFiles();
+          if (latestExpiredTimeBucket.get() > -1) {
+            try {
+              managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(
+                  latestExpiredTimeBucket.getAndSet(-1));
+            } catch (IOException e) {
+              throwable.set(e);
+              LOG.debug("delete files", e);
+              Throwables.propagate(e);
+            }
+          }
+        }
+      }
+    });
+  }
+
+  protected void transferWindowFiles()
+  {
+    try {
+      Long windowId = windowsToTransfer.poll();
+      if (windowId != null) {
+        try {
+          LOG.debug("transfer window {}", windowId);
+          //bucket id => bucket data(key => value, time-buckets)
+          Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = savedWindows.remove(windowId);
+
+          for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) {
+            managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(),
+                singleBucket.getValue());
+          }
+          storageAgent.delete(managedStateContext.getOperatorContext().getId(), windowId);
+        } catch (Throwable t) {
+          throwable.set(t);
+          LOG.debug("transfer window {}", windowId, t);
+          Throwables.propagate(t);
+        }
+      } else {
+        Thread.sleep(waitMillis);
+      }
+    } catch (InterruptedException ex) {
+      //sleep can be interrupted by teardown so no need to re-throw interrupt exception
+      LOG.debug("interrupted", ex);
+    }
+  }
+
+  @Override
+  public void save(Object object, int operatorId, long windowId) throws IOException
+  {
+    throw new UnsupportedOperationException("doesn't support saving any object");
+  }
+
+  /**
+   * The unsaved state combines data received in multiple windows. This window data manager persists this data
+   * on disk by the window id in which it was requested.
+   * @param unsavedData   un-saved data of all buckets.
+   * @param operatorId    operator id.
+   * @param windowId      window id.
+   * @param skipWriteToWindowFile flag that enables/disables saving the window file.
+   *
+   * @throws IOException
+   */
+  public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, int operatorId, long windowId,
+      boolean skipWriteToWindowFile)
+      throws IOException
+  {
+    Throwable lthrowable;
+    if ((lthrowable = throwable.get()) != null) {
+      LOG.error("Error while transferring");
+      Throwables.propagate(lthrowable);
+    }
+    savedWindows.put(windowId, unsavedData);
+
+    if (!skipWriteToWindowFile) {
+      super.save(unsavedData, operatorId, windowId);
+    }
+  }
+
+
+
+  /**
+   * Transfers the data which has been committed till windowId to data files.
+   *
+   * @param operatorId operator id
+   * @param windowId   window id
+   */
+  @SuppressWarnings("UnusedParameters")
+  protected void committed(int operatorId, long windowId) throws IOException, InterruptedException
+  {
+    LOG.debug("data manager committed {}", windowId);
+    for (Long currentWindow : savedWindows.keySet()) {
+      if (currentWindow <= windowId) {
+        LOG.debug("to transfer {}", windowId);
+        windowsToTransfer.add(currentWindow);
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    super.teardown();
+    transfer = false;
+    writerService.shutdownNow();
+  }
+
+  public void setLatestExpiredTimeBucket(long timeBucket)
+  {
+    latestExpiredTimeBucket.set(timeBucket);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManager.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
new file mode 100644
index 0000000..12928f1
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * Managed state has a limit on amount of data in memory.
+ */
+public interface ManagedState
+{
+  /**
+   * Sets the maximum memory size.
+   * @param bytes max size in bytes.
+   */
+  void setMaxMemorySize(long bytes);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
new file mode 100644
index 0000000..1044e15
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+public interface ManagedStateComponent
+{
+  /**
+   * Callback to setup using managed state context
+   *
+   * @param managedStateContext managed state context
+   */
+  void setup(@NotNull ManagedStateContext managedStateContext);
+
+  /**
+   * Callback to perform teardown.
+   */
+  void teardown();
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
new file mode 100644
index 0000000..406fdbd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.Comparator;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+public interface ManagedStateContext
+{
+  FileAccess getFileAccess();
+
+  Context.OperatorContext getOperatorContext();
+
+  Comparator<Slice> getKeyComparator();
+
+  BucketsFileSystem getBucketsFileSystem();
+
+  TimeBucketAssigner getTimeBucketAssigner();
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
new file mode 100644
index 0000000..4c3cf84
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Basic implementation of {@link AbstractManagedStateImpl} where system time corresponding to an application window is
+ * used to sub-group key of a particular bucket.<br/>
+ */
+public class ManagedStateImpl extends AbstractManagedStateImpl implements BucketedState
+{
+  private long time = System.currentTimeMillis();
+  private transient long timeIncrement;
+
+  public ManagedStateImpl()
+  {
+    this.numBuckets = 1;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+        context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+  }
+
+  @Override
+  public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    putInBucket(bucketId, timeBucket, key, value);
+  }
+
+  @Override
+  public Slice getSync(long bucketId, @NotNull Slice key)
+  {
+    return getValueFromBucketSync(bucketId, -1, key);
+  }
+
+  /**
+   * Returns the future using which the value is obtained.<br/>
+   * If the key is present in the bucket cache, then the future has its value set when constructed;
+   * if not the value is set after it's read from the data files which is after a while.
+   *
+   * @param key key
+   * @return value of the key if found; null if the key is not found;
+   */
+  @Override
+  public Future<Slice> getAsync(long bucketId, @NotNull Slice key)
+  {
+    return getValueFromBucketAsync(bucketId, -1, key);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    time += timeIncrement;
+  }
+
+  @Min(1)
+  @Override
+  public int getNumBuckets()
+  {
+    return numBuckets;
+  }
+
+  /**
+   * Sets the number of buckets.
+   *
+   * @param numBuckets number of buckets
+   */
+  public void setNumBuckets(int numBuckets)
+  {
+    this.numBuckets = numBuckets;
+  }
+}