You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/25 14:15:24 UTC

[1/3] incubator-apex-malhar git commit: APEXMALHAR-1897 added managed state

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 5373a3cb6 -> a8fbcac62


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
new file mode 100644
index 0000000..952b4f6
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class TimeBucketAssignerTest
+{
+
+  class TestMeta extends TestWatcher
+  {
+    TimeBucketAssigner timeBucketAssigner;
+    MockManagedStateContext mockManagedStateContext;
+
+    @Override
+    protected void starting(Description description)
+    {
+      timeBucketAssigner = new TimeBucketAssigner();
+      mockManagedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    TimeBucketAssigner deserialized = KryoCloneUtils.cloneObject(testMeta.timeBucketAssigner);
+    Assert.assertNotNull("time bucket assigner", deserialized);
+  }
+
+  @Test
+  public void testNumBuckets()
+  {
+    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
+    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
+
+    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+    Assert.assertEquals("num buckets", 2, testMeta.timeBucketAssigner.getNumBuckets());
+    testMeta.timeBucketAssigner.teardown();
+  }
+
+  @Test
+  public void testTimeBucketKey()
+  {
+    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
+    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
+
+    long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
+    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+    long time1 = referenceTime - Duration.standardMinutes(2).getMillis();
+    Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketFor(time1));
+
+    long time0 = referenceTime - Duration.standardMinutes(40).getMillis();
+    Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketFor(time0));
+
+    long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis();
+    Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketFor(expiredTime));
+    testMeta.timeBucketAssigner.teardown();
+  }
+
+  @Test
+  public void testSlidingOnlyBetweenWindow() throws InterruptedException
+  {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicInteger timesCalled = new AtomicInteger();
+    testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener()
+    {
+      @Override
+      public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+      {
+        timesCalled.getAndIncrement();
+        latch.countDown();
+      }
+    });
+
+    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+    testMeta.timeBucketAssigner.beginWindow(0);
+    latch.await();
+    testMeta.timeBucketAssigner.endWindow();
+    int valueBeforeSleep = timesCalled.get();
+    Thread.sleep(1000);
+    Assert.assertEquals("value should not change", valueBeforeSleep, timesCalled.get());
+    testMeta.timeBucketAssigner.teardown();
+  }
+
+}


[3/3] incubator-apex-malhar git commit: APEXMALHAR-1897 added managed state

Posted by th...@apache.org.
APEXMALHAR-1897 added managed state


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a8fbcac6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a8fbcac6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a8fbcac6

Branch: refs/heads/devel-3
Commit: a8fbcac6236e4130cef1e83830e944c4788bbca4
Parents: 5373a3c
Author: Chandni Singh <cs...@apache.org>
Authored: Sun Dec 13 03:13:08 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Mar 25 00:04:04 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/fileaccess/FileAccess.java  |   3 +
 .../lib/fileaccess/FileAccessFSImpl.java        |  11 +-
 .../apex/malhar/lib/state/BucketedState.java    |  72 +++
 .../lib/state/TimeSlicedBucketedState.java      | 104 ++++
 .../state/managed/AbstractManagedStateImpl.java | 583 +++++++++++++++++++
 .../apex/malhar/lib/state/managed/Bucket.java   | 525 +++++++++++++++++
 .../lib/state/managed/BucketsFileSystem.java    | 566 ++++++++++++++++++
 .../managed/IncrementalCheckpointManager.java   | 213 +++++++
 .../malhar/lib/state/managed/ManagedState.java  |  32 +
 .../state/managed/ManagedStateComponent.java    |  36 ++
 .../lib/state/managed/ManagedStateContext.java  |  38 ++
 .../lib/state/managed/ManagedStateImpl.java     | 103 ++++
 .../lib/state/managed/ManagedTimeStateImpl.java | 103 ++++
 .../managed/ManagedTimeUnifiedStateImpl.java    | 213 +++++++
 .../malhar/lib/state/managed/StateTracker.java  | 194 ++++++
 .../lib/state/managed/TimeBucketAssigner.java   | 242 ++++++++
 .../malhar/lib/state/managed/package-info.java  |  22 +
 .../apex/malhar/lib/state/package-info.java     |  22 +
 .../state/managed/BucketsFileSystemTest.java    | 166 ++++++
 .../lib/state/managed/DefaultBucketTest.java    | 203 +++++++
 .../IncrementalCheckpointManagerTest.java       | 196 +++++++
 .../lib/state/managed/ManagedStateImplTest.java | 182 ++++++
 .../state/managed/ManagedStateTestUtils.java    | 141 +++++
 .../state/managed/ManagedTimeStateImplTest.java | 151 +++++
 .../ManagedTimeUnifiedStateImplTest.java        | 149 +++++
 .../state/managed/MockManagedStateContext.java  |  91 +++
 .../lib/state/managed/StateTrackerTest.java     | 174 ++++++
 .../state/managed/TimeBucketAssignerTest.java   | 123 ++++
 28 files changed, 4654 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
index d4c7810..f8dd0be 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
@@ -52,8 +52,11 @@ public interface FileAccess extends Closeable
    * @throws IOException
    */
   void rename(long bucketKey, String oldName, String newName) throws IOException;
+
   void delete(long bucketKey, String fileName) throws IOException;
 
+  void deleteBucket(long bucketKey) throws IOException;
+
   long getFileSize(long bucketKey, String s) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
index 74ab238..a9cfe00 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
@@ -135,10 +135,13 @@ public abstract class FileAccessFSImpl implements FileAccess
   public RemoteIterator<LocatedFileStatus> listFiles(long bucketKey) throws IOException
   {
     Path bucketPath = getBucketPath(bucketKey);
-    if (!fs.exists(bucketPath)) {
-      return null;
-    }
-    return fs.listFiles(bucketPath, true);
+    return fs.exists(bucketPath) ? fs.listFiles(bucketPath, true) : null;
+  }
+
+  @Override
+  public void deleteBucket(long bucketKey) throws IOException
+  {
+    fs.delete(getBucketPath(bucketKey), true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
new file mode 100644
index 0000000..a270eb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A state where keys are grouped in buckets.
+ */
+public interface BucketedState
+{
+  /**
+   * An expired value. In some implementations where bucketId is time related then the event can be old and
+   * the get methods- getSync & getAsync return this fixed slice instance.<br/>
+   * In the usages, comparisons with EXPIRED should be made using <code>==</code> instead of <code>equals</code>.
+   */
+  Slice EXPIRED = new Slice(null, -1, -1);
+
+  /**
+   * Sets the value of the key in bucket identified by bucketId.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param key      key (not null)
+   * @param value    value (not null)
+   */
+  void put(long bucketId, @NotNull Slice key, @NotNull Slice value);
+
+  /**
+   * Returns the value of the key in a bucket identified by bucketId. Fetching a key can be expensive if the key
+   * is not in memory and is present on disk. This fetches the key synchronously. <br/>
+   * {@link #getAsync(long, Slice)} is recommended for efficiently reading the value of a key.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param key     key (not null)
+   *
+   * @return value of the key if found; null if the key is not found;
+   * {@link #EXPIRED} if the bucketId is time based and very old.
+   */
+  Slice getSync(long bucketId, @NotNull Slice key);
+
+  /**
+   * Returns the future using which the value is obtained.<br/>
+   *
+   * @param key key (not null)
+   *
+   * @return value of the key if found; null if the key is not found;
+   * {@link #EXPIRED} if the bucketId is time based and very old.
+   */
+  Future<Slice> getAsync(long bucketId, @NotNull Slice key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
new file mode 100644
index 0000000..55b92a3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A type of bucketed state where a bucket's data is further divided into time buckets. This requires
+ * time per key to figure out which time bucket a particular key belongs to.
+ * <p/>
+ * The time here is mainly used for purging of aged key/value pair.
+ */
+public interface TimeSlicedBucketedState
+{
+  /**
+   * Sets the value of a key in the bucket identified by bucketId. Time is used to derive which time bucket (within
+   * the main bucket) a key belongs to.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param time    time associated with the key.
+   * @param key     key   (not null)
+   * @param value   value (not null)
+   */
+  void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value);
+
+  /**
+   * Returns the value of the key in the bucket identified by bucketId.<br/>
+   * If the value of the key is not present in the bucket cache then this scans all the time bucket files on disk from
+   * the latest to the oldest.
+   *
+   * It retrieves the value synchronously that can be expensive.<br/>
+   * {@link #getAsync(long, Slice)} is recommended for efficient reading the value of a key.
+   *
+   *
+   * @param bucketId identifier of the bucket
+   * @param key key (not null)
+   *
+   * @return value of the key if found; null if the key is not found;
+   */
+  Slice getSync(long bucketId, @NotNull Slice key);
+
+
+  /**
+   * Returns the value of key in the bucket identified by bucketId.<br/>
+   * If the value of the key is not present in the bucket cache then this will use the time to derive the time
+   * bucket and just search for the key in a particular time bucket file.<br/>
+   *
+   * It retrieves the value synchronously which can be expensive.<br/>
+   * {@link #getAsync(long, long, Slice)} is recommended for efficiently reading the value of a key.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param time  time for deriving the time bucket.
+   * @param key   key (not null)
+   *
+   * @return value of the key if found; null if the key is not found; {@link BucketedState#EXPIRED} if the time is old.
+   */
+  Slice getSync(long bucketId, long time, @NotNull Slice key);
+
+  /**
+   * Returns the future using which the value is obtained.<br/>
+   * If the value of the key is not present in the bucket cache then this searches for it in all the time buckets on
+   * disk.<br/>
+   * Time-buckets are looked-up in order from newest to oldest.
+   *
+   * @param bucketId identifier of the bucket.
+   * @param key      key (not null)
+   *
+   * @return value of the key if found; null if the key is not found;
+   */
+  Future<Slice> getAsync(long bucketId, @NotNull Slice key);
+
+  /**
+   * Returns the future using which the value is obtained.<br/>
+   * If the value of the key is not present in the bucket cache then this will use the time to derive the time
+   * bucket and just search for the key in a particular time bucket file.<br/>
+   *
+   * @param bucketId  identifier of the bucket.
+   * @param time     time associated with the key.
+   * @param key      key  (not null)
+   *
+   * @return value of the key if found; null if the key is not found; {@link BucketedState#EXPIRED} if time is very old.
+   */
+  Future<Slice> getAsync(long bucketId, long time, @NotNull Slice key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
new file mode 100644
index 0000000..11db44d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of managed state.<br/>
+ *
+ * The important sub-components here are:
+ * <ol>
+ *   <li>
+ *     {@link #checkpointManager}: writes incremental checkpoints in window files and transfers data from window
+ *     files to bucket files.
+ *   </li>
+ *   <li>
+ *     {@link #bucketsFileSystem}: manages writing/reading from all the buckets. A bucket on disk is further sub-divided
+ *     into time-buckets. This abstracts out updating time-buckets and meta files and reading from them.
+ *   </li>
+ *   <li>
+ *     {@link #timeBucketAssigner}: assigns time-buckets to keys and manages the time boundaries.
+ *   </li>
+ *   <li>
+ *     {@link #stateTracker}: tracks the size of data in memory and requests buckets to free memory when enough memory
+ *     is not available.
+ *   </li>
+ *   <li>
+ *     {@link #fileAccess}: plug-able file system abstraction.
+ *   </li>
+ * </ol>
+ * <p/>
+ * <b>Differences between different concrete implementations of {@link AbstractManagedStateImpl}</b>
+ * <table>
+ *   <tr>
+ *     <td></td>
+ *     <td>{@link ManagedStateImpl}</td>
+ *     <td>{@link ManagedTimeStateImpl}</td>
+ *     <td>{@link ManagedTimeUnifiedStateImpl}</td>
+ *   </tr>
+ *   <tr>
+ *     <td>Main buckets</td>
+ *     <td>identified by unique adhoc long ids that the user provides with the key.</td>
+ *     <td>same as ManagedStateImpl.</td>
+ *     <td>user doesn't provide bucket ids and instead just provides time. Time is used to derive the time buckets
+ *     and these are the main buckets.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>Data on disk: data in buckets is persisted on disk with each bucket data further divided into
+ *     time-buckets, i.e., {base_path}/{bucketId}/{time-bucket id}</td>
+ *     <td>time-bucket is computed using the system time corresponding to the application window.</td>
+ *     <td>time-bucket is derived from the user provided time.</td>
+ *     <td>time-bucket is derived from the user provided time.
+ *     In this implementation operator id is used to isolate data of different partitions on disk, i.e.,
+ *     {base_path}/{operatorId}/{time-bucket id}</td>
+ *   </tr>
+ *   <tr>
+ *     <td>Bucket partitioning</td>
+ *     <td>bucket belongs to just one partition. Multiple partitions cannot write to the same bucket.</td>
+ *     <td>same as ManagedStateImpl.</td>
+ *     <td>multiple partitions can be working with the same time-bucket since time-bucket is derived from time.
+ *     This works because on disk each partition's data is segregated by the operator id.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>Dynamic partitioning</td>
+ *     <td>can support dynamic partitioning by pre-allocating buckets.</td>
+ *     <td>same as ManagedStateImpl.</td>
+ *     <td>will not be able to support dynamic partitioning efficiently.</td>
+ *   </tr>
+ * </table>
+ *
+ */
+public abstract class AbstractManagedStateImpl
+    implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext,
+    TimeBucketAssigner.PurgeListener
+{
+  private long maxMemorySize;
+
+  protected int numBuckets;
+
+  @NotNull
+  private FileAccess fileAccess = new TFileImpl.DTFileImpl();
+  @NotNull
+  protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+
+  protected Bucket[] buckets;
+
+  @Min(1)
+  private int numReaders = 1;
+  @NotNull
+  protected transient ExecutorService readerService;
+
+  @NotNull
+  protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();
+
+  @NotNull
+  protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
+
+  protected transient OperatorContext operatorContext;
+
+  @NotNull
+  protected Comparator<Slice> keyComparator = new SliceComparator();
+
+  protected final transient AtomicReference<Throwable> throwable = new AtomicReference<>();
+
+  @NotNull
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration checkStateSizeInterval = Duration.millis(
+      DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue);
+
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration durationPreventingFreeingSpace;
+
+  private transient StateTracker stateTracker = new StateTracker();
+
+  //accessible to StateTracker
+  final transient Object commitLock = new Object();
+
+  protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId =
+      Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create());
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    operatorContext = context;
+    fileAccess.init();
+
+    timeBucketAssigner.setPurgeListener(this);
+
+    //setup all the managed state components
+    timeBucketAssigner.setup(this);
+    checkpointManager.setup(this);
+    bucketsFileSystem.setup(this);
+
+    if (buckets == null) {
+      //create buckets array only once at start when it is not created.
+      numBuckets = getNumBuckets();
+      buckets = new Bucket[numBuckets];
+    }
+    for (Bucket bucket : buckets) {
+      if (bucket != null) {
+        bucket.setup(this);
+      }
+    }
+
+    stateTracker.setup(this);
+    long activationWindow = context.getValue(OperatorContext.ACTIVATION_WINDOW_ID);
+
+    if (activationWindow != Stateless.WINDOW_ID) {
+      //delete all the wal files with windows > activationWindow.
+      //All the wal files with windows <= activationWindow are loaded and kept separately as recovered data.
+      try {
+        for (long recoveredWindow : checkpointManager.getWindowIds(operatorContext.getId())) {
+          if (recoveredWindow <= activationWindow) {
+            @SuppressWarnings("unchecked")
+            Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
+                checkpointManager.load(operatorContext.getId(), recoveredWindow);
+            if (recoveredData != null && !recoveredData.isEmpty()) {
+              for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) {
+                int bucketIdx = prepareBucket(entry.getKey());
+                buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue());
+              }
+            }
+            checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow,
+                true /*skipWritingToWindowFile*/);
+
+          } else {
+            checkpointManager.delete(operatorContext.getId(), recoveredWindow);
+          }
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("recovering", e);
+      }
+    }
+
+    readerService = Executors.newFixedThreadPool(numReaders, new NameableThreadFactory("managedStateReaders"));
+  }
+
+  /**
+   * Gets the number of buckets which is required during setup to create the array of buckets.<br/>
+   * {@link ManagedTimeStateImpl} provides num of buckets which is injected using a property.<br/>
+   * {@link ManagedTimeUnifiedStateImpl} provides num of buckets which are calculated based on time settings.
+   *
+   * @return number of buckets.
+   */
+  public abstract int getNumBuckets();
+
+  public void beginWindow(long windowId)
+  {
+    if (throwable.get() != null) {
+      Throwables.propagate(throwable.get());
+    }
+    timeBucketAssigner.beginWindow(windowId);
+  }
+
+
+  /**
+   * Prepares the bucket and returns its index.
+   * @param bucketId bucket key
+   * @return bucket index
+   */
+  protected int prepareBucket(long bucketId)
+  {
+    stateTracker.bucketAccessed(bucketId);
+    int bucketIdx = getBucketIdx(bucketId);
+
+    Bucket bucket = buckets[bucketIdx];
+    if (bucket == null) {
+      //bucket is not in memory
+      bucket = newBucket(bucketId);
+      bucket.setup(this);
+      buckets[bucketIdx] = bucket;
+    } else if (bucket.getBucketId() != bucketId) {
+      handleBucketConflict(bucketIdx, bucketId);
+    }
+    return bucketIdx;
+  }
+
+  protected void putInBucket(long bucketId, long timeBucket, @NotNull Slice key, @NotNull Slice value)
+  {
+    Preconditions.checkNotNull(key, "key");
+    Preconditions.checkNotNull(value, "value");
+    if (timeBucket != -1) {
+      //time bucket is invalid data is not stored
+      int bucketIdx = prepareBucket(bucketId);
+      buckets[bucketIdx].put(key, timeBucket, value);
+    }
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  protected Slice getValueFromBucketSync(long bucketId, long timeBucket, @NotNull Slice key)
+  {
+    Preconditions.checkNotNull(key, "key");
+    int bucketIdx = prepareBucket(bucketId);
+    Bucket bucket = buckets[bucketIdx];
+    synchronized (bucket) {
+      return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
+    }
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  protected Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket, @NotNull Slice key)
+  {
+    Preconditions.checkNotNull(key, "key");
+    int bucketIdx = prepareBucket(bucketId);
+    Bucket bucket = buckets[bucketIdx];
+    synchronized (bucket) {
+      Slice cachedVal = bucket.get(key, timeBucket, Bucket.ReadSource.MEMORY);
+      if (cachedVal != null) {
+        return Futures.immediateFuture(cachedVal);
+      }
+      ValueFetchTask valueFetchTask = new ValueFetchTask(bucket, key, timeBucket, this);
+      tasksPerBucketId.put(bucket.getBucketId(), valueFetchTask);
+      return readerService.submit(valueFetchTask);
+    }
+  }
+
+  protected void handleBucketConflict(int bucketIdx, long newBucketId)
+  {
+    throw new IllegalArgumentException("bucket conflict " + buckets[bucketIdx].getBucketId() + " " + newBucketId);
+  }
+
+  protected int getBucketIdx(long bucketId)
+  {
+    return (int)(bucketId % numBuckets);
+  }
+
+  Bucket getBucket(long bucketId)
+  {
+    return buckets[getBucketIdx(bucketId)];
+  }
+
+  protected Bucket newBucket(long bucketId)
+  {
+    return new Bucket.DefaultBucket(bucketId);
+  }
+
+  public void endWindow()
+  {
+    timeBucketAssigner.endWindow();
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    Map<Long, Map<Slice, Bucket.BucketedValue>> flashData = Maps.newHashMap();
+
+    for (Bucket bucket : buckets) {
+      if (bucket != null) {
+        synchronized (bucket) {
+          Map<Slice, Bucket.BucketedValue> flashDataForBucket = bucket.checkpoint(windowId);
+          if (!flashDataForBucket.isEmpty()) {
+            flashData.put(bucket.getBucketId(), flashDataForBucket);
+          }
+        }
+      }
+    }
+    if (!flashData.isEmpty()) {
+      try {
+        checkpointManager.save(flashData, operatorContext.getId(), windowId, false);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  @Override
+  public void committed(long windowId)
+  {
+    synchronized (commitLock) {
+      try {
+        for (Bucket bucket : buckets) {
+          if (bucket != null) {
+            synchronized (bucket) {
+              bucket.committed(windowId);
+            }
+          }
+        }
+        checkpointManager.committed(operatorContext.getId(), windowId);
+      } catch (IOException | InterruptedException e) {
+        throw new RuntimeException("committing " + windowId, e);
+      }
+    }
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  @Override
+  public void teardown()
+  {
+    checkpointManager.teardown();
+    bucketsFileSystem.teardown();
+    timeBucketAssigner.teardown();
+    readerService.shutdownNow();
+    for (Bucket bucket : buckets) {
+      if (bucket != null) {
+        synchronized (bucket) {
+          bucket.teardown();
+        }
+      }
+    }
+    stateTracker.teardown();
+  }
+
+  @Override
+  public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+  {
+    checkpointManager.setLatestExpiredTimeBucket(timeBucket);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext()
+  {
+    return operatorContext;
+  }
+
+  @Override
+  public void setMaxMemorySize(long bytes)
+  {
+    maxMemorySize = bytes;
+  }
+
+  /**
+   *
+   * @return the optimal size of the cache that triggers eviction of committed data from memory.
+   */
+  public long getMaxMemorySize()
+  {
+    return maxMemorySize;
+  }
+
+  /**
+   * Sets the {@link FileAccess} implementation.
+   * @param fileAccess specific implementation of FileAccess.
+   */
+  public void setFileAccess(@NotNull FileAccess fileAccess)
+  {
+    this.fileAccess = Preconditions.checkNotNull(fileAccess);
+  }
+
+  @Override
+  public FileAccess getFileAccess()
+  {
+    return fileAccess;
+  }
+
+  /**
+   * Sets the time bucket assigner. This can be used for plugging any custom time bucket assigner.
+   *
+   * @param timeBucketAssigner a {@link TimeBucketAssigner}
+   */
+  public void setTimeBucketAssigner(@NotNull TimeBucketAssigner timeBucketAssigner)
+  {
+    this.timeBucketAssigner = Preconditions.checkNotNull(timeBucketAssigner);
+  }
+
+  @Override
+  public TimeBucketAssigner getTimeBucketAssigner()
+  {
+    return timeBucketAssigner;
+  }
+
+  @Override
+  public Comparator<Slice> getKeyComparator()
+  {
+    return keyComparator;
+  }
+
+  /**
+   * Sets the key comparator. The keys on the disk in time bucket files are sorted. This sets the comparator for the
+   * key.
+   * @param keyComparator key comparator
+   */
+  public void setKeyComparator(@NotNull Comparator<Slice> keyComparator)
+  {
+    this.keyComparator = Preconditions.checkNotNull(keyComparator);
+  }
+
+  public BucketsFileSystem getBucketsFileSystem()
+  {
+    return bucketsFileSystem;
+  }
+
+  /**
+   * @return number of worker threads in the reader service.
+   */
+  public int getNumReaders()
+  {
+    return numReaders;
+  }
+
+  /**
+   * Sets the number of worker threads in the reader service which is responsible for asynchronously fetching
+   * values of the keys. This should not exceed number of buckets.
+   *
+   * @param numReaders number of worker threads in the reader service.
+   */
+  public void setNumReaders(int numReaders)
+  {
+    this.numReaders = numReaders;
+  }
+
+  /**
+   * @return regular interval at which the size of state is checked.
+   */
+  public Duration getCheckStateSizeInterval()
+  {
+    return checkStateSizeInterval;
+  }
+
+  /**
+   * Sets the interval at which the size of state is regularly checked.
+
+   * @param checkStateSizeInterval regular interval at which the size of state is checked.
+   */
+  public void setCheckStateSizeInterval(@NotNull Duration checkStateSizeInterval)
+  {
+    this.checkStateSizeInterval = Preconditions.checkNotNull(checkStateSizeInterval);
+  }
+
+  /**
+   * @return duration which prevents a bucket being evicted.
+   */
+  public Duration getDurationPreventingFreeingSpace()
+  {
+    return durationPreventingFreeingSpace;
+  }
+
+  /**
+   * Sets the duration which prevents buckets to free space. For example if this is set to an hour, then only
+   * buckets which were not accessed in last one hour will be triggered to free spaces.
+   *
+   * @param durationPreventingFreeingSpace time duration
+   */
+  public void setDurationPreventingFreeingSpace(Duration durationPreventingFreeingSpace)
+  {
+    this.durationPreventingFreeingSpace = durationPreventingFreeingSpace;
+  }
+
+  static class ValueFetchTask implements Callable<Slice>
+  {
+    private final Bucket bucket;
+    private final long timeBucketId;
+    private final Slice key;
+    private final AbstractManagedStateImpl managedState;
+
+    ValueFetchTask(@NotNull Bucket bucket, @NotNull Slice key, long timeBucketId, AbstractManagedStateImpl managedState)
+    {
+      this.bucket = Preconditions.checkNotNull(bucket);
+      this.timeBucketId = timeBucketId;
+      this.key = Preconditions.checkNotNull(key);
+      this.managedState = Preconditions.checkNotNull(managedState);
+    }
+
+    @Override
+    public Slice call() throws Exception
+    {
+      try {
+        synchronized (bucket) {
+          //a particular bucket should only be handled by one thread at any point of time. Handling of bucket here
+          //involves creating readers for the time buckets and de-serializing key/value from a reader.
+          Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL);
+          managedState.tasksPerBucketId.remove(bucket.getBucketId(), this);
+          return value;
+        }
+      } catch (Throwable t) {
+        managedState.throwable.set(t);
+        throw Throwables.propagate(t);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void setStateTracker(@NotNull StateTracker stateTracker)
+  {
+    this.stateTracker = Preconditions.checkNotNull(stateTracker, "state tracker");
+  }
+
+  @VisibleForTesting
+  void setBucketsFileSystem(@NotNull BucketsFileSystem bucketsFileSystem)
+  {
+    this.bucketsFileSystem = Preconditions.checkNotNull(bucketsFileSystem, "buckets file system");
+  }
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateImpl.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
new file mode 100644
index 0000000..b2c1618
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A bucket that groups events.
+ */
+public interface Bucket extends ManagedStateComponent
+{
+  /**
+   * @return bucket id
+   */
+  long getBucketId();
+
+  /**
+   *
+   * @return size of bucket in memory.
+   */
+  long getSizeInBytes();
+
+  /**
+   * Get value of a key.
+   *
+   * @param key        key.
+   * @param timeBucket time bucket of the key if known; -1 otherwise.
+   * @param source     source to read from
+   * @return value of the key.
+   */
+  Slice get(Slice key, long timeBucket, ReadSource source);
+
+  /**
+   * Set value of a key.
+   *
+   * @param key        key.
+   * @param timeBucket timeBucket of the key.
+   * @param value      value of the key.
+   */
+  void put(Slice key, long timeBucket, Slice value);
+
+  /**
+   * Triggers the bucket to checkpoint. Returns the non checkpointed data so far.
+   *
+   * @return non checkpointed data.
+   */
+  Map<Slice, BucketedValue> checkpoint(long windowId);
+
+  /**
+   * Triggers the bucket to commit data till provided window id.
+   *
+   * @param windowId window id
+   */
+  void committed(long windowId);
+
+  /**
+   * Triggers bucket to free memory which is already persisted in bucket data files.
+   *
+   * @return amount of memory freed in bytes.
+   * @throws IOException
+   */
+  long freeMemory() throws IOException;
+
+  /**
+   * Allows the bucket to process/cache data which is recovered (from window files) after failure.
+   *
+   * @param windowId recovery window
+   * @param recoveredData recovered data
+   */
+  void recoveredData(long windowId, Map<Slice, Bucket.BucketedValue> recoveredData);
+
+  enum ReadSource
+  {
+    MEMORY,      //state in memory in key/value form
+    READERS,     //these are streams in which the key will be searched and serialized.
+    ALL          //both the above states.
+  }
+
+  class BucketedValue
+  {
+    private long timeBucket;
+    private Slice value;
+
+    protected BucketedValue()
+    {
+    }
+
+    protected BucketedValue(long timeBucket, Slice value)
+    {
+      this.timeBucket = timeBucket;
+      this.value = value;
+    }
+
+    protected long getTimeBucket()
+    {
+      return timeBucket;
+    }
+
+    protected void setTimeBucket(long timeBucket)
+    {
+      this.timeBucket = timeBucket;
+    }
+
+    public Slice getValue()
+    {
+      return value;
+    }
+
+    public void setValue(Slice value)
+    {
+      this.value = value;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof BucketedValue)) {
+        return false;
+      }
+
+      BucketedValue that = (BucketedValue)o;
+
+      return timeBucket == that.timeBucket && value.equals(that.value);
+
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(timeBucket, value);
+    }
+  }
+
+  /**
+   * Default bucket.<br/>
+   * Not thread-safe.
+   */
+  class DefaultBucket implements Bucket
+  {
+    private final long bucketId;
+
+    //Key -> Ordered values
+    private transient Map<Slice, BucketedValue> flash = Maps.newConcurrentMap();
+
+    //Data persisted in write ahead logs. window -> bucket
+    private final transient ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> checkpointedData =
+        new ConcurrentSkipListMap<>();
+
+    //Data persisted in bucket data files
+    private final transient Map<Slice, BucketedValue> committedData = Maps.newConcurrentMap();
+
+    //Data serialized/deserialized from bucket data files: key -> value from latest time bucket on file
+    private final transient Map<Slice, BucketedValue> fileCache = Maps.newConcurrentMap();
+
+    //TimeBucket -> FileReaders
+    private final transient Map<Long, FileAccess.FileReader> readers = Maps.newTreeMap();
+
+    protected transient ManagedStateContext managedStateContext;
+
+    private AtomicLong sizeInBytes = new AtomicLong(0);
+
+    private final transient Slice dummyGetKey = new Slice(null, 0, 0);
+
+    private transient TreeSet<BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
+
+    private DefaultBucket()
+    {
+      //for kryo
+      bucketId = -1;
+    }
+
+    protected DefaultBucket(long bucketId)
+    {
+      this.bucketId = bucketId;
+    }
+
+    @Override
+    public void setup(@NotNull ManagedStateContext managedStateContext)
+    {
+      this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+    }
+
+    @Override
+    public long getBucketId()
+    {
+      return bucketId;
+    }
+
+    @Override
+    public long getSizeInBytes()
+    {
+      return sizeInBytes.longValue();
+    }
+
+    private Slice getFromMemory(Slice key)
+    {
+      //search the cache for key
+      BucketedValue bucketedValue = flash.get(key);
+      if (bucketedValue != null) {
+        return bucketedValue.getValue();
+      }
+
+      for (Long window : checkpointedData.descendingKeySet()) {
+        //traverse the checkpointed data in reverse order
+        bucketedValue = checkpointedData.get(window).get(key);
+        if (bucketedValue != null) {
+          return bucketedValue.getValue();
+        }
+      }
+
+      bucketedValue = committedData.get(key);
+      if (bucketedValue != null) {
+        return bucketedValue.getValue();
+      }
+
+      bucketedValue = fileCache.get(key);
+      if (bucketedValue != null) {
+        return bucketedValue.getValue();
+      }
+
+      return null;
+    }
+
+    private Slice getFromReaders(Slice key, long timeBucket)
+    {
+      try {
+        if (cachedBucketMetas == null) {
+          cachedBucketMetas = managedStateContext.getBucketsFileSystem().getAllTimeBuckets(bucketId);
+        }
+        if (timeBucket != -1) {
+          Slice valSlice = getValueFromTimeBucketReader(key, timeBucket);
+          if (valSlice != null) {
+            if (timeBucket == cachedBucketMetas.first().getTimeBucketId()) {
+              //if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache.
+              BucketedValue bucketedValue = new BucketedValue(timeBucket, valSlice);
+              fileCache.put(key, bucketedValue);
+            }
+          }
+          return valSlice;
+        } else {
+          //search all the time buckets
+          for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) {
+
+            if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) {
+              //keys in the time bucket files are sorted so if the first key in the file is greater than the key being
+              //searched, the key will not be present in that file.
+              Slice valSlice = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId());
+              if (valSlice != null) {
+                BucketedValue bucketedValue = new BucketedValue(immutableTimeBucketMeta.getTimeBucketId(), valSlice);
+                //Only when the key is read from the latest time bucket on the file, the key/value is put in the file
+                // cache.
+                fileCache.put(key, bucketedValue);
+                return valSlice;
+              }
+            }
+          }
+          return null;
+        }
+
+      } catch (IOException e) {
+        throw new RuntimeException("get time-buckets " + bucketId, e);
+      }
+    }
+
+    @Override
+    public Slice get(Slice key, long timeBucket, ReadSource readSource)
+    {
+      switch (readSource) {
+        case MEMORY:
+          return getFromMemory(key);
+        case READERS:
+          return getFromReaders(key, timeBucket);
+        case ALL:
+        default:
+          Slice value = getFromMemory(key);
+          if (value != null) {
+            return value;
+          }
+          return getFromReaders(key, timeBucket);
+      }
+    }
+
+    /**
+     * Returns the value for the key from a time-bucket reader
+     * @param key        key
+     * @param timeBucket time bucket
+     * @return value if key is found in the time bucket; false otherwise
+     */
+    private Slice getValueFromTimeBucketReader(Slice key, long timeBucket)
+    {
+      FileAccess.FileReader fileReader = readers.get(timeBucket);
+      if (fileReader != null) {
+        return readValue(fileReader, key, timeBucket);
+      }
+      //file reader is not loaded and is null
+      try {
+        if (loadFileReader(timeBucket)) {
+          return readValue(readers.get(timeBucket), key, timeBucket);
+        }
+        return null;
+      } catch (IOException e) {
+        throw new RuntimeException("while loading " + bucketId + ", " + timeBucket, e);
+      }
+    }
+
+    private Slice readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket)
+    {
+      Slice valSlice = new Slice(null, 0, 0);
+      try {
+        if (fileReader.seek(key)) {
+          fileReader.next(dummyGetKey, valSlice);
+          return valSlice;
+        } else {
+          return null;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("reading " + bucketId + ", " + timeBucket, e);
+      }
+    }
+
+    private boolean loadFileReader(long timeBucketId) throws IOException
+    {
+      BucketsFileSystem.TimeBucketMeta tbm = managedStateContext.getBucketsFileSystem()
+          .getTimeBucketMeta(bucketId, timeBucketId);
+
+      if (tbm != null) {
+        FileAccess.FileReader reader = managedStateContext.getBucketsFileSystem().getReader(bucketId,
+            BucketsFileSystem.getFileName(timeBucketId));
+        readers.put(timeBucketId, reader);
+        sizeInBytes.getAndAdd(tbm.getSizeInBytes());
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void put(Slice key, long timeBucket, Slice value)
+    {
+      BucketedValue bucketedValue = flash.get(key);
+      if (bucketedValue == null) {
+        bucketedValue = new BucketedValue();
+        flash.put(key, bucketedValue);
+        sizeInBytes.getAndAdd(key.length);
+        sizeInBytes.getAndAdd(Long.SIZE);
+      }
+      if (timeBucket > bucketedValue.getTimeBucket()) {
+
+        int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
+        sizeInBytes.getAndAdd(inc);
+        bucketedValue.setTimeBucket(timeBucket);
+        bucketedValue.setValue(value);
+      }
+    }
+
+    @Override
+    public long freeMemory() throws IOException
+    {
+      LOG.debug("free space {}", bucketId);
+      long memoryFreed = 0;
+      for (Map.Entry<Slice, BucketedValue> entry : committedData.entrySet()) {
+        memoryFreed += entry.getKey().length + entry.getValue().getValue().length;
+      }
+      committedData.clear();
+      fileCache.clear();
+      if (cachedBucketMetas != null) {
+
+        for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas) {
+          FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
+          if (reader != null) {
+            memoryFreed += tbm.getSizeInBytes();
+            reader.close();
+          }
+        }
+
+      }
+      sizeInBytes.getAndAdd(-memoryFreed);
+      return memoryFreed;
+    }
+
+    @Override
+    public Map<Slice, BucketedValue> checkpoint(long windowId)
+    {
+      try {
+        //transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
+        return flash;
+      } finally {
+        checkpointedData.put(windowId, flash);
+        flash = Maps.newHashMap();
+      }
+    }
+
+    @Override
+    public void committed(long committedWindowId)
+    {
+      Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> stateIterator = checkpointedData.entrySet().iterator();
+
+      while (stateIterator.hasNext()) {
+        Map.Entry<Long, Map<Slice, BucketedValue>> entry = stateIterator.next();
+
+        long savedWindow = entry.getKey();
+        if (savedWindow <= committedWindowId) {
+          Map<Slice, BucketedValue> bucketData = entry.getValue();
+
+          //removing any stale values from the file cache
+          for (Slice key : bucketData.keySet()) {
+            fileCache.remove(key);
+          }
+
+          for (BucketedValue bucketedValue : bucketData.values()) {
+            FileAccess.FileReader reader = readers.get(bucketedValue.getTimeBucket());
+            if (reader != null) {
+              //closing the file reader for the time bucket if it is in memory because the time-bucket is modified
+              //so it will be re-written by BucketsDataManager
+              try {
+                LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket());
+                reader.close();
+              } catch (IOException e) {
+                throw new RuntimeException("closing reader " + bucketId + ", " + bucketedValue.getTimeBucket(), e);
+              }
+              readers.remove(bucketedValue.getTimeBucket());
+            }
+            if (readers.isEmpty()) {
+              break;
+            }
+          }
+          committedData.putAll(bucketData);
+          stateIterator.remove();
+        } else {
+          break;
+        }
+      }
+
+      cachedBucketMetas = null;
+    }
+
+    @Override
+    public void recoveredData(long recoveredWindow, Map<Slice, BucketedValue> data)
+    {
+      checkpointedData.put(recoveredWindow, data);
+    }
+
+    @Override
+    public void teardown()
+    {
+      Set<Long> failureBuckets = Sets.newHashSet();
+      for (Map.Entry<Long, FileAccess.FileReader> entry : readers.entrySet()) {
+        try {
+          LOG.debug("closing reader {} {}", bucketId, entry.getKey());
+          entry.getValue().close();
+        } catch (IOException e) {
+          //will try to close all readers
+          failureBuckets.add(entry.getKey());
+        }
+      }
+      if (!failureBuckets.isEmpty()) {
+        StringBuilder builder = new StringBuilder("teardown of ");
+        builder.append(bucketId).append(" < ");
+        for (Long timeBucket : failureBuckets) {
+          builder.append(timeBucket);
+        }
+        builder.append(">");
+        throw new RuntimeException(builder.toString());
+      }
+    }
+
+    @VisibleForTesting
+    Map<Long, FileAccess.FileReader> getReaders()
+    {
+      return readers;
+    }
+
+    @VisibleForTesting
+    Map<Slice, BucketedValue> getCommittedData()
+    {
+      return committedData;
+    }
+
+    @VisibleForTesting
+    ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> getCheckpointedData()
+    {
+      return checkpointedData;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
new file mode 100644
index 0000000..8304fb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeBasedTable;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Persists bucket data on disk and maintains meta information about the buckets.
+ * <p/>
+ *
+ * Each bucket has a meta-data file and the format of that is :<br/>
+ * <ol>
+ * <li>version of the meta data (int)</li>
+ * <li>total number of time-buckets (int)</li>
+ * <li>For each time bucket
+ * <ol>
+ * <li>time bucket key (long)</li>
+ * <li>size of data (sum of bytes) (long)</li>
+ * <li>last transferred window id (long)</li>
+ * <li>length of the first key in the time-bucket file (int)</li>
+ * <li>first key in the time-bucket file (byte[])</li>
+ * </ol>
+ * </li>
+ * </ol>
+ * <p/>
+ * Meta data information is updated by {@link IncrementalCheckpointManager}. Any updates are restricted to the package.
+ */
+public class BucketsFileSystem implements ManagedStateComponent
+{
+  static final String META_FILE_NAME = "_META";
+  private static final int META_FILE_VERSION = 1;
+
+  private final transient TreeBasedTable<Long, Long, MutableTimeBucketMeta> timeBucketsMeta = TreeBasedTable.create();
+
+  //Check-pointed set of all buckets this instance has written to.
+  protected final Set<Long> bucketNamesOnFS = new ConcurrentSkipListSet<>();
+
+  protected transient ManagedStateContext managedStateContext;
+
+  @Override
+  public void setup(@NotNull ManagedStateContext managedStateContext)
+  {
+    this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+  }
+
+  protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().getWriter(bucketId, fileName);
+  }
+
+  protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().getReader(bucketId, fileName);
+  }
+
+  protected void rename(long bucketId, String fromName, String toName) throws IOException
+  {
+    managedStateContext.getFileAccess().rename(bucketId, fromName, toName);
+  }
+
+  protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().getOutputStream(bucketId, fileName);
+  }
+
+  protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().getInputStream(bucketId, fileName);
+  }
+
+  protected boolean exists(long bucketId, String fileName) throws IOException
+  {
+    return managedStateContext.getFileAccess().exists(bucketId, fileName);
+  }
+
+  protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException
+  {
+    return managedStateContext.getFileAccess().listFiles(bucketId);
+  }
+
+  protected void delete(long bucketId, String fileName) throws IOException
+  {
+    managedStateContext.getFileAccess().delete(bucketId, fileName);
+  }
+
+  protected void deleteBucket(long bucketId) throws IOException
+  {
+    managedStateContext.getFileAccess().deleteBucket(bucketId);
+  }
+
+  /**
+   * Saves data to a bucket. The data consists of key/values of all time-buckets of a particular bucket.
+   *
+   * @param windowId        window id
+   * @param bucketId        bucket id
+   * @param data            data of all time-buckets
+   * @throws IOException
+   */
+  protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) throws IOException
+  {
+    Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys = TreeBasedTable.create(Ordering.<Long>natural(),
+        managedStateContext.getKeyComparator());
+
+    for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) {
+      long timeBucketId = entry.getValue().getTimeBucket();
+      timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue());
+    }
+
+    for (long timeBucket : timeBucketedKeys.rowKeySet()) {
+      BucketsFileSystem.MutableTimeBucketMeta tbm = getOrCreateTimeBucketMeta(bucketId, timeBucket);
+      addBucketName(bucketId);
+
+      long dataSize = 0;
+      Slice firstKey = null;
+
+      FileAccess.FileWriter fileWriter;
+      String tmpFileName = getTmpFileName();
+      if (tbm.getLastTransferredWindowId() == -1) {
+        //A new time bucket so we append all the key/values to the new file
+        fileWriter = getWriter(bucketId, tmpFileName);
+
+        for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet()) {
+          Slice key = entry.getKey();
+          Slice value = entry.getValue().getValue();
+
+          dataSize += key.length;
+          dataSize += value.length;
+
+          fileWriter.append(key.toByteArray(), value.toByteArray());
+          if (firstKey == null) {
+            firstKey = key;
+          }
+        }
+      } else {
+        //the time bucket existed so we need to read the file and then re-write it
+        TreeMap<Slice, Slice> fileData = new TreeMap<>(managedStateContext.getKeyComparator());
+        FileAccess.FileReader fileReader = getReader(bucketId, getFileName(timeBucket));
+        fileReader.readFully(fileData);
+        fileReader.close();
+
+        for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet()) {
+          fileData.put(entry.getKey(), entry.getValue().getValue());
+        }
+
+        fileWriter = getWriter(bucketId, tmpFileName);
+        for (Map.Entry<Slice, Slice> entry : fileData.entrySet()) {
+          Slice key = entry.getKey();
+          Slice value = entry.getValue();
+
+          dataSize += key.length;
+          dataSize += value.length;
+
+          fileWriter.append(key.toByteArray(), value.toByteArray());
+          if (firstKey == null) {
+            firstKey = key;
+          }
+        }
+      }
+      fileWriter.close();
+      rename(bucketId, tmpFileName, getFileName(timeBucket));
+      tbm.updateTimeBucketMeta(windowId, dataSize, firstKey);
+    }
+
+    updateBucketMetaFile(bucketId);
+  }
+
+  /**
+   * Retrieves the time bucket meta of a particular time-bucket. If the time bucket doesn't exist then a new one
+   * is created.
+   *
+   * @param bucketId     bucket id
+   * @param timeBucketId time bucket id
+   * @return time bucket meta of the time bucket
+   * @throws IOException
+   */
+  @NotNull
+  MutableTimeBucketMeta getOrCreateTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
+  {
+    synchronized (timeBucketsMeta) {
+      MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
+      if (tbm == null) {
+        tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
+        timeBucketsMeta.put(bucketId, timeBucketId, tbm);
+      }
+      return tbm;
+    }
+  }
+
+  protected void addBucketName(long bucketId)
+  {
+    bucketNamesOnFS.add(bucketId);
+  }
+
+  /**
+   * Returns the time bucket meta of a particular time-bucket which is immutable.
+   *
+   * @param bucketId     bucket id
+   * @param timeBucketId time bucket id
+   * @return immutable time bucket meta
+   * @throws IOException
+   */
+  @Nullable
+  public TimeBucketMeta getTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
+  {
+    synchronized (timeBucketsMeta) {
+      MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
+      if (tbm != null) {
+        return tbm.getImmutableTimeBucketMeta();
+      }
+      return null;
+    }
+  }
+
+  /**
+   * This should be entered only after acquiring the lock on {@link #timeBucketsMeta}
+   *
+   * @param bucketId      bucket id
+   * @param timeBucketId  time bucket id
+   * @return Mutable time bucket meta for a bucket id and time bucket id.
+   * @throws IOException
+   */
+  private MutableTimeBucketMeta timeBucketMetaHelper(long bucketId, long timeBucketId) throws IOException
+  {
+    MutableTimeBucketMeta tbm = timeBucketsMeta.get(bucketId, timeBucketId);
+    if (tbm != null) {
+      return tbm;
+    }
+    if (exists(bucketId, META_FILE_NAME)) {
+      try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
+        //Load meta info of all the time buckets of the bucket identified by bucketId.
+        loadBucketMetaFile(bucketId, dis);
+      }
+    } else {
+      return null;
+    }
+    return timeBucketsMeta.get(bucketId, timeBucketId);
+  }
+
+  /**
+   * Returns the meta information of all the time buckets in the bucket in descending order - latest to oldest.
+   *
+   * @param bucketId bucket id
+   * @return all the time buckets in order - latest to oldest
+   */
+  public TreeSet<TimeBucketMeta> getAllTimeBuckets(long bucketId) throws IOException
+  {
+    synchronized (timeBucketsMeta) {
+      TreeSet<TimeBucketMeta> immutableTimeBucketMetas = Sets.newTreeSet(
+          Collections.<TimeBucketMeta>reverseOrder());
+
+      if (timeBucketsMeta.containsRow(bucketId)) {
+        for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
+          immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+        }
+        return immutableTimeBucketMetas;
+      }
+      if (exists(bucketId, META_FILE_NAME)) {
+        try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
+          //Load meta info of all the time buckets of the bucket identified by bucket id
+          loadBucketMetaFile(bucketId, dis);
+          for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
+            immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+          }
+          return immutableTimeBucketMetas;
+        }
+      }
+      return immutableTimeBucketMetas;
+    }
+  }
+
+  /**
+   * Loads the bucket meta-file. This should be entered only after acquiring the lock on {@link #timeBucketsMeta}.
+   *
+   * @param bucketId bucket id
+   * @param dis      data input stream
+   * @throws IOException
+   */
+  private void loadBucketMetaFile(long bucketId, DataInputStream dis) throws IOException
+  {
+    int metaDataVersion = dis.readInt();
+
+    if (metaDataVersion == META_FILE_VERSION) {
+      int numberOfEntries = dis.readInt();
+
+      for (int i = 0; i < numberOfEntries; i++) {
+        long timeBucketId = dis.readLong();
+        long dataSize = dis.readLong();
+        long lastTransferredWindow = dis.readLong();
+
+        MutableTimeBucketMeta tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
+
+        int sizeOfFirstKey = dis.readInt();
+        byte[] firstKeyBytes = new byte[sizeOfFirstKey];
+        dis.readFully(firstKeyBytes, 0, firstKeyBytes.length);
+        tbm.updateTimeBucketMeta(lastTransferredWindow, dataSize, new Slice(firstKeyBytes));
+
+        timeBucketsMeta.put(bucketId, timeBucketId, tbm);
+      }
+    }
+  }
+
+  /**
+   * Saves the updated bucket meta on disk.
+   *
+   * @param bucketId bucket id
+   * @throws IOException
+   */
+  void updateBucketMetaFile(long bucketId) throws IOException
+  {
+    Map<Long, MutableTimeBucketMeta> timeBuckets;
+    synchronized (timeBucketsMeta) {
+      timeBuckets = timeBucketsMeta.row(bucketId);
+
+      Preconditions.checkNotNull(timeBuckets, "timeBuckets");
+      String tmpFileName = getTmpFileName();
+
+      try (DataOutputStream dos = getOutputStream(bucketId, tmpFileName)) {
+        dos.writeInt(META_FILE_VERSION);
+        dos.writeInt(timeBuckets.size());
+        for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBuckets.entrySet()) {
+          MutableTimeBucketMeta tbm = entry.getValue();
+          dos.writeLong(tbm.getTimeBucketId());
+          dos.writeLong(tbm.getSizeInBytes());
+          dos.writeLong(tbm.getLastTransferredWindowId());
+          dos.writeInt(tbm.getFirstKey().length);
+          dos.write(tbm.getFirstKey().toByteArray());
+        }
+
+      }
+      rename(bucketId, tmpFileName, META_FILE_NAME);
+    }
+  }
+
+  protected void deleteTimeBucketsLessThanEqualTo(long latestExpiredTimeBucket) throws IOException
+  {
+    LOG.debug("delete files before {}", latestExpiredTimeBucket);
+
+    for (long bucketName : bucketNamesOnFS) {
+      RemoteIterator<LocatedFileStatus> timeBucketsIterator = listFiles(bucketName);
+      boolean emptyBucket = true;
+      while (timeBucketsIterator.hasNext()) {
+        LocatedFileStatus timeBucketStatus = timeBucketsIterator.next();
+
+        String timeBucketStr = timeBucketStatus.getPath().getName();
+        if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) {
+          //ignoring meta and tmp files
+          continue;
+        }
+        long timeBucket = Long.parseLong(timeBucketStr);
+
+        if (timeBucket <= latestExpiredTimeBucket) {
+          LOG.debug("deleting bucket {} time-bucket {}", timeBucket);
+          invalidateTimeBucket(bucketName, timeBucket);
+          delete(bucketName, timeBucketStatus.getPath().getName());
+        } else {
+          emptyBucket = false;
+        }
+      }
+      if (emptyBucket) {
+        LOG.debug("deleting bucket {}", bucketName);
+        deleteBucket(bucketName);
+      }
+    }
+  }
+
+  void invalidateTimeBucket(long bucketId, long timeBucketId) throws IOException
+  {
+    synchronized (timeBucketsMeta) {
+      timeBucketsMeta.remove(bucketId, timeBucketId);
+    }
+    updateBucketMetaFile(bucketId);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  /**
+   * This serves the readers - {@link Bucket.DefaultBucket}.
+   * It is immutable and accessible outside the package unlike {@link MutableTimeBucketMeta}.
+   */
+  public static class TimeBucketMeta implements Comparable<TimeBucketMeta>
+  {
+    private final long bucketId;
+    private final long timeBucketId;
+    private long lastTransferredWindowId = -1;
+    private long sizeInBytes;
+    private Slice firstKey;
+
+    private TimeBucketMeta()
+    {
+      //for kryo
+      bucketId = -1;
+      timeBucketId = -1;
+    }
+
+    private TimeBucketMeta(long bucketId, long timeBucketId)
+    {
+      this.bucketId = bucketId;
+      this.timeBucketId = timeBucketId;
+    }
+
+    public long getLastTransferredWindowId()
+    {
+      return lastTransferredWindowId;
+    }
+
+    public long getSizeInBytes()
+    {
+      return this.sizeInBytes;
+    }
+
+    public long getBucketId()
+    {
+      return bucketId;
+    }
+
+    public long getTimeBucketId()
+    {
+      return timeBucketId;
+    }
+
+    public Slice getFirstKey()
+    {
+      return firstKey;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof TimeBucketMeta)) {
+        return false;
+      }
+
+      TimeBucketMeta that = (TimeBucketMeta)o;
+
+      return bucketId == that.bucketId && timeBucketId == that.timeBucketId;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(bucketId, timeBucketId);
+    }
+
+    @Override
+    public int compareTo(@NotNull TimeBucketMeta o)
+    {
+      if (bucketId < o.bucketId) {
+        return -1;
+      }
+      if (bucketId > o.bucketId) {
+        return 1;
+      }
+      if (timeBucketId < o.timeBucketId) {
+        return -1;
+      }
+      if (timeBucketId > o.timeBucketId) {
+        return 1;
+      }
+      return 0;
+    }
+  }
+
+  /**
+   * Represents time bucket meta information which can be changed.
+   * The updates to an instance and read/creation of {@link #immutableTimeBucketMeta} belonging to it are synchronized
+   * as different threads are updating and reading from it.<br/>
+   *
+   * The instance is updated when data from window files are transferred to bucket files and
+   * {@link Bucket.DefaultBucket} reads the immutable time bucket meta.
+   */
+  static class MutableTimeBucketMeta extends TimeBucketMeta
+  {
+    private transient TimeBucketMeta immutableTimeBucketMeta;
+
+    private volatile boolean changed;
+
+    public MutableTimeBucketMeta(long bucketId, long timeBucketId)
+    {
+      super(bucketId, timeBucketId);
+    }
+
+    synchronized void updateTimeBucketMeta(long lastTransferredWindow, long bytes, @NotNull Slice firstKey)
+    {
+      changed = true;
+      super.lastTransferredWindowId = lastTransferredWindow;
+      super.sizeInBytes = bytes;
+      super.firstKey = Preconditions.checkNotNull(firstKey, "first key");
+    }
+
+    synchronized TimeBucketMeta getImmutableTimeBucketMeta()
+    {
+      if (immutableTimeBucketMeta == null || changed) {
+
+        immutableTimeBucketMeta = new TimeBucketMeta(getBucketId(), getTimeBucketId());
+        immutableTimeBucketMeta.lastTransferredWindowId = getLastTransferredWindowId();
+        immutableTimeBucketMeta.sizeInBytes = getSizeInBytes();
+        immutableTimeBucketMeta.firstKey = getFirstKey();
+        changed = false;
+      }
+      return immutableTimeBucketMeta;
+    }
+
+  }
+
+  protected static String getFileName(long timeBucketId)
+  {
+    return Long.toString(timeBucketId);
+  }
+
+  protected static String getTmpFileName()
+  {
+    return System.currentTimeMillis() + ".tmp";
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(BucketsFileSystem.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
new file mode 100644
index 0000000..a372163
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Queues;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Manages state which is written to files by windows. The state from the window files are then transferred to bucket
+ * data files. This class listens to time expiry events issued by {@link TimeBucketAssigner}.
+ *
+ * This component is also responsible for purging old time buckets.
+ */
+public class IncrementalCheckpointManager extends WindowDataManager.FSWindowDataManager
+    implements ManagedStateComponent
+{
+  private static final String WAL_RELATIVE_PATH = "managed_state";
+
+  //windowId => (bucketId => data)
+  private final transient Map<Long, Map<Long, Map<Slice, Bucket.BucketedValue>>> savedWindows = new
+      ConcurrentSkipListMap<>();
+
+  private transient ExecutorService writerService;
+  private transient volatile boolean transfer;
+
+  private final transient LinkedBlockingQueue<Long> windowsToTransfer = Queues.newLinkedBlockingQueue();
+  private final transient AtomicReference<Throwable> throwable = new AtomicReference<>();
+
+  protected transient ManagedStateContext managedStateContext;
+
+  private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1);
+
+  private transient int waitMillis;
+
+
+  public IncrementalCheckpointManager()
+  {
+    super();
+    setRecoveryPath(WAL_RELATIVE_PATH);
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public void setup(@NotNull final ManagedStateContext managedStateContext)
+  {
+    this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+    waitMillis = managedStateContext.getOperatorContext().getValue(Context.OperatorContext.SPIN_MILLIS);
+    super.setup(managedStateContext.getOperatorContext());
+
+    writerService = Executors.newSingleThreadExecutor(new NameableThreadFactory("managed-state-writer"));
+    transfer = true;
+    writerService.submit(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        while (transfer) {
+          transferWindowFiles();
+          if (latestExpiredTimeBucket.get() > -1) {
+            try {
+              managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(
+                  latestExpiredTimeBucket.getAndSet(-1));
+            } catch (IOException e) {
+              throwable.set(e);
+              LOG.debug("delete files", e);
+              Throwables.propagate(e);
+            }
+          }
+        }
+      }
+    });
+  }
+
+  protected void transferWindowFiles()
+  {
+    try {
+      Long windowId = windowsToTransfer.poll();
+      if (windowId != null) {
+        try {
+          LOG.debug("transfer window {}", windowId);
+          //bucket id => bucket data(key => value, time-buckets)
+          Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = savedWindows.remove(windowId);
+
+          for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) {
+            managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(),
+                singleBucket.getValue());
+          }
+          storageAgent.delete(managedStateContext.getOperatorContext().getId(), windowId);
+        } catch (Throwable t) {
+          throwable.set(t);
+          LOG.debug("transfer window {}", windowId, t);
+          Throwables.propagate(t);
+        }
+      } else {
+        Thread.sleep(waitMillis);
+      }
+    } catch (InterruptedException ex) {
+      //sleep can be interrupted by teardown so no need to re-throw interrupt exception
+      LOG.debug("interrupted", ex);
+    }
+  }
+
+  @Override
+  public void save(Object object, int operatorId, long windowId) throws IOException
+  {
+    throw new UnsupportedOperationException("doesn't support saving any object");
+  }
+
+  /**
+   * The unsaved state combines data received in multiple windows. This window data manager persists this data
+   * on disk by the window id in which it was requested.
+   * @param unsavedData   un-saved data of all buckets.
+   * @param operatorId    operator id.
+   * @param windowId      window id.
+   * @param skipWriteToWindowFile flag that enables/disables saving the window file.
+   *
+   * @throws IOException
+   */
+  public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, int operatorId, long windowId,
+      boolean skipWriteToWindowFile)
+      throws IOException
+  {
+    Throwable lthrowable;
+    if ((lthrowable = throwable.get()) != null) {
+      LOG.error("Error while transferring");
+      Throwables.propagate(lthrowable);
+    }
+    savedWindows.put(windowId, unsavedData);
+
+    if (!skipWriteToWindowFile) {
+      super.save(unsavedData, operatorId, windowId);
+    }
+  }
+
+
+
+  /**
+   * Transfers the data which has been committed till windowId to data files.
+   *
+   * @param operatorId operator id
+   * @param windowId   window id
+   */
+  @SuppressWarnings("UnusedParameters")
+  protected void committed(int operatorId, long windowId) throws IOException, InterruptedException
+  {
+    LOG.debug("data manager committed {}", windowId);
+    for (Long currentWindow : savedWindows.keySet()) {
+      if (currentWindow <= windowId) {
+        LOG.debug("to transfer {}", windowId);
+        windowsToTransfer.add(currentWindow);
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    super.teardown();
+    transfer = false;
+    writerService.shutdownNow();
+  }
+
+  public void setLatestExpiredTimeBucket(long timeBucket)
+  {
+    latestExpiredTimeBucket.set(timeBucket);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManager.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
new file mode 100644
index 0000000..12928f1
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * Managed state has a limit on amount of data in memory.
+ */
+public interface ManagedState
+{
+  /**
+   * Sets the maximum memory size.
+   * @param bytes max size in bytes.
+   */
+  void setMaxMemorySize(long bytes);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
new file mode 100644
index 0000000..1044e15
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+public interface ManagedStateComponent
+{
+  /**
+   * Callback to setup using managed state context
+   *
+   * @param managedStateContext managed state context
+   */
+  void setup(@NotNull ManagedStateContext managedStateContext);
+
+  /**
+   * Callback to perform teardown.
+   */
+  void teardown();
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
new file mode 100644
index 0000000..406fdbd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.Comparator;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+public interface ManagedStateContext
+{
+  FileAccess getFileAccess();
+
+  Context.OperatorContext getOperatorContext();
+
+  Comparator<Slice> getKeyComparator();
+
+  BucketsFileSystem getBucketsFileSystem();
+
+  TimeBucketAssigner getTimeBucketAssigner();
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
new file mode 100644
index 0000000..4c3cf84
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Basic implementation of {@link AbstractManagedStateImpl} where system time corresponding to an application window is
+ * used to sub-group key of a particular bucket.<br/>
+ */
+public class ManagedStateImpl extends AbstractManagedStateImpl implements BucketedState
+{
+  private long time = System.currentTimeMillis();
+  private transient long timeIncrement;
+
+  public ManagedStateImpl()
+  {
+    this.numBuckets = 1;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+        context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+  }
+
+  @Override
+  public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    putInBucket(bucketId, timeBucket, key, value);
+  }
+
+  @Override
+  public Slice getSync(long bucketId, @NotNull Slice key)
+  {
+    return getValueFromBucketSync(bucketId, -1, key);
+  }
+
+  /**
+   * Returns the future using which the value is obtained.<br/>
+   * If the key is present in the bucket cache, then the future has its value set when constructed;
+   * if not the value is set after it's read from the data files which is after a while.
+   *
+   * @param key key
+   * @return value of the key if found; null if the key is not found;
+   */
+  @Override
+  public Future<Slice> getAsync(long bucketId, @NotNull Slice key)
+  {
+    return getValueFromBucketAsync(bucketId, -1, key);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    time += timeIncrement;
+  }
+
+  @Min(1)
+  @Override
+  public int getNumBuckets()
+  {
+    return numBuckets;
+  }
+
+  /**
+   * Sets the number of buckets.
+   *
+   * @param numBuckets number of buckets
+   */
+  public void setNumBuckets(int numBuckets)
+  {
+    this.numBuckets = numBuckets;
+  }
+}


[2/3] incubator-apex-malhar git commit: APEXMALHAR-1897 added managed state

Posted by th...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
new file mode 100644
index 0000000..708cfeb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.TimeSlicedBucketedState;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This implementation of {@link AbstractManagedStateImpl} lets the client to specify the time for each key.
+ * The value of time is used to derive the time-bucket of a key.
+ */
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements TimeSlicedBucketedState
+{
+  public ManagedTimeStateImpl()
+  {
+    this.numBuckets = 1;
+  }
+
+  @Override
+  public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    putInBucket(bucketId, timeBucket, key, value);
+  }
+
+  @Override
+  public Slice getSync(long bucketId, @NotNull Slice key)
+  {
+    return getValueFromBucketSync(bucketId, -1, key);
+  }
+
+  @Override
+  public Slice getSync(long bucketId, long time, @NotNull Slice key)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    if (timeBucket == -1) {
+      //time is expired so no point in looking further.
+      return BucketedState.EXPIRED;
+    }
+    return getValueFromBucketSync(bucketId, timeBucket, key);
+  }
+
+  @Override
+  public Future<Slice> getAsync(long bucketId, Slice key)
+  {
+    return getValueFromBucketAsync(bucketId, -1, key);
+  }
+
+  @Override
+  public Future<Slice> getAsync(long bucketId, long time, Slice key)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    if (timeBucket == -1) {
+      //time is expired so no point in looking further.
+      return Futures.immediateFuture(BucketedState.EXPIRED);
+    }
+    return getValueFromBucketAsync(bucketId, timeBucket, key);
+  }
+
+  @Min(1)
+  @Override
+  public int getNumBuckets()
+  {
+    return numBuckets;
+  }
+
+  /**
+   * Sets the number of buckets.
+   *
+   * @param numBuckets number of buckets
+   */
+  public void setNumBuckets(int numBuckets)
+  {
+    this.numBuckets = numBuckets;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
new file mode 100644
index 0000000..6f531eb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * In this implementation of {@link AbstractManagedStateImpl} the buckets in memory are time-buckets.
+ */
+public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implements BucketedState
+{
+  private final transient LinkedBlockingQueue<Long> purgedTimeBuckets = Queues.newLinkedBlockingQueue();
+  private final transient Set<Bucket> bucketsForTeardown = Sets.newHashSet();
+
+  public ManagedTimeUnifiedStateImpl()
+  {
+    bucketsFileSystem = new TimeUnifiedBucketsFileSystem();
+  }
+
+  @Override
+  public int getNumBuckets()
+  {
+    return timeBucketAssigner.getNumBuckets();
+  }
+
+  @Override
+  public void put(long time, @NotNull Slice key, @NotNull Slice value)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    putInBucket(timeBucket, timeBucket, key, value);
+  }
+
+  @Override
+  public Slice getSync(long time, @NotNull Slice key)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    if (timeBucket == -1) {
+      //time is expired so return expired slice.
+      return BucketedState.EXPIRED;
+    }
+    return getValueFromBucketSync(timeBucket, timeBucket, key);
+  }
+
+  @Override
+  public Future<Slice> getAsync(long time, @NotNull Slice key)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    if (timeBucket == -1) {
+      //time is expired so return expired slice.
+      return Futures.immediateFuture(BucketedState.EXPIRED);
+    }
+    return getValueFromBucketAsync(timeBucket, timeBucket, key);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    Long purgedTimeBucket;
+
+    //collect all the purged time buckets
+    while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) {
+      int purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
+      if (buckets[purgedTimeBucketIdx] != null && buckets[purgedTimeBucketIdx].getBucketId() == purgedTimeBucket) {
+        bucketsForTeardown.add(buckets[purgedTimeBucketIdx]);
+        buckets[purgedTimeBucketIdx] = null;
+      }
+    }
+
+    //tear down all the eligible time buckets
+    Iterator<Bucket> bucketIterator = bucketsForTeardown.iterator();
+    while (bucketIterator.hasNext()) {
+      Bucket bucket = bucketIterator.next();
+      if (!tasksPerBucketId.containsKey(bucket.getBucketId())) {
+        //no pending asynchronous queries for this bucket id
+        bucket.teardown();
+        bucketIterator.remove();
+      }
+    }
+  }
+
+  @Override
+  protected void handleBucketConflict(int bucketIdx, long newBucketId)
+  {
+    Preconditions.checkArgument(buckets[bucketIdx].getBucketId() < newBucketId, "new time bucket should have a value"
+        + " greater than the old time bucket");
+    //Time buckets are purged periodically so here a bucket conflict is expected and so we just ignore conflicts.
+    bucketsForTeardown.add(buckets[bucketIdx]);
+    buckets[bucketIdx] = newBucket(newBucketId);
+    buckets[bucketIdx].setup(this);
+  }
+
+  @Override
+  public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+  {
+    purgedTimeBuckets.add(timeBucket);
+    super.purgeTimeBucketsLessThanEqualTo(timeBucket);
+  }
+
+  /**
+   * This uses operator id instead of bucket id as the name of parent folder of time-buckets. This is because
+   * multiple partitions may work on same time-buckets.
+   */
+  private static class TimeUnifiedBucketsFileSystem extends BucketsFileSystem
+  {
+    @Override
+    protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().getWriter(managedStateContext.getOperatorContext().getId(), fileName);
+    }
+
+    @Override
+    protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().getReader(managedStateContext.getOperatorContext().getId(), fileName);
+    }
+
+    @Override
+    protected void rename(long bucketId, String fromName, String toName) throws IOException
+    {
+      managedStateContext.getFileAccess().rename(managedStateContext.getOperatorContext().getId(), fromName, toName);
+    }
+
+    @Override
+    protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().getOutputStream(managedStateContext.getOperatorContext().getId(),
+          fileName);
+    }
+
+    @Override
+    protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().getInputStream(managedStateContext.getOperatorContext().getId(),
+          fileName);
+    }
+
+    @Override
+    protected boolean exists(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().exists(managedStateContext.getOperatorContext().getId(),
+          fileName);
+    }
+
+    @Override
+    protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException
+    {
+      return managedStateContext.getFileAccess().listFiles(managedStateContext.getOperatorContext().getId());
+    }
+
+    @Override
+    protected void delete(long bucketId, String fileName) throws IOException
+    {
+      managedStateContext.getFileAccess().delete(managedStateContext.getOperatorContext().getId(), fileName);
+    }
+
+    @Override
+    protected void deleteBucket(long bucketId) throws IOException
+    {
+      managedStateContext.getFileAccess().deleteBucket(managedStateContext.getOperatorContext().getId());
+    }
+
+    @Override
+    protected void addBucketName(long bucketId)
+    {
+      long operatorId = (long)managedStateContext.getOperatorContext().getId();
+      if (!bucketNamesOnFS.contains(operatorId)) {
+        bucketNamesOnFS.add(operatorId);
+      }
+    }
+  }
+
+  private static transient Logger LOG = LoggerFactory.getLogger(ManagedTimeUnifiedStateImpl.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
new file mode 100644
index 0000000..a359b31
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Tracks the size of state in memory and evicts buckets.
+ */
+class StateTracker extends TimerTask
+{
+  //bucket id -> bucket id & time wrapper
+  private final transient ConcurrentHashMap<Long, BucketIdTimeWrapper> bucketAccessTimes = new ConcurrentHashMap<>();
+
+  private transient ConcurrentSkipListSet<BucketIdTimeWrapper> bucketHeap;
+
+  private final transient Timer memoryFreeService = new Timer();
+
+  protected transient AbstractManagedStateImpl managedStateImpl;
+
+  void setup(@NotNull AbstractManagedStateImpl managedStateImpl)
+  {
+    this.managedStateImpl = Preconditions.checkNotNull(managedStateImpl, "managed state impl");
+
+    this.bucketHeap = new ConcurrentSkipListSet<>(
+        new Comparator<BucketIdTimeWrapper>()
+        {
+          //Note: this comparator imposes orderings that are inconsistent with equals.
+          @Override
+          public int compare(BucketIdTimeWrapper o1, BucketIdTimeWrapper o2)
+          {
+            if (o1.getLastAccessedTime() < o2.getLastAccessedTime()) {
+              return -1;
+            }
+            if (o1.getLastAccessedTime() > o2.getLastAccessedTime()) {
+              return 1;
+            }
+
+            return Long.compare(o1.bucketId, o2.bucketId);
+          }
+        });
+    long intervalMillis = managedStateImpl.getCheckStateSizeInterval().getMillis();
+    memoryFreeService.scheduleAtFixedRate(this, intervalMillis, intervalMillis);
+  }
+
+  void bucketAccessed(long bucketId)
+  {
+    BucketIdTimeWrapper idTimeWrapper = bucketAccessTimes.get(bucketId);
+    if (idTimeWrapper != null) {
+      bucketHeap.remove(idTimeWrapper);
+    }  else {
+      idTimeWrapper = new BucketIdTimeWrapper(bucketId);
+    }
+    idTimeWrapper.setLastAccessedTime(System.currentTimeMillis());
+    bucketHeap.add(idTimeWrapper);
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  @Override
+  public void run()
+  {
+    synchronized (managedStateImpl.commitLock) {
+      //freeing of state needs to be stopped during commit as commit results in transferring data to a state which
+      // can be freed up as well.
+      long bytesSum = 0;
+      for (Bucket bucket : managedStateImpl.buckets) {
+        if (bucket != null) {
+          bytesSum += bucket.getSizeInBytes();
+        }
+      }
+
+      if (bytesSum > managedStateImpl.getMaxMemorySize()) {
+        Duration duration = managedStateImpl.getDurationPreventingFreeingSpace();
+        long durationMillis = 0;
+        if (duration != null) {
+          durationMillis = duration.getMillis();
+        }
+
+        BucketIdTimeWrapper idTimeWrapper;
+        while (bytesSum > managedStateImpl.getMaxMemorySize() && bucketHeap.size() > 0 &&
+            null != (idTimeWrapper = bucketHeap.first())) {
+          //trigger buckets to free space
+
+          if (System.currentTimeMillis() - idTimeWrapper.getLastAccessedTime() < durationMillis) {
+            //if the least recently used bucket cannot free up space because it was accessed within the
+            //specified duration then subsequent buckets cannot free space as well because this heap is ordered by time.
+            break;
+          }
+          long bucketId = idTimeWrapper.bucketId;
+          Bucket bucket = managedStateImpl.getBucket(bucketId);
+          if (bucket != null) {
+
+            synchronized (bucket) {
+              long sizeFreed;
+              try {
+                sizeFreed = bucket.freeMemory();
+                LOG.debug("size freed {} {}", bucketId, sizeFreed);
+              } catch (IOException e) {
+                managedStateImpl.throwable.set(e);
+                throw new RuntimeException("freeing " + bucketId, e);
+              }
+              bytesSum -= sizeFreed;
+            }
+            bucketHeap.remove(idTimeWrapper);
+            bucketAccessTimes.remove(bucketId);
+          }
+        }
+      }
+    }
+  }
+
+  void teardown()
+  {
+    memoryFreeService.cancel();
+  }
+
+  /**
+   * Wrapper class for bucket id and the last time the bucket was accessed.
+   */
+  private static class BucketIdTimeWrapper
+  {
+    private final long bucketId;
+    private long lastAccessedTime;
+
+    BucketIdTimeWrapper(long bucketId)
+    {
+      this.bucketId = bucketId;
+    }
+
+    private synchronized long getLastAccessedTime()
+    {
+      return lastAccessedTime;
+    }
+
+    private synchronized void setLastAccessedTime(long lastAccessedTime)
+    {
+      this.lastAccessedTime = lastAccessedTime;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof BucketIdTimeWrapper)) {
+        return false;
+      }
+
+      BucketIdTimeWrapper that = (BucketIdTimeWrapper)o;
+      //Note: the comparator used with bucket heap imposes orderings that are inconsistent with equals
+      return bucketId == that.bucketId;
+
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return (int)(bucketId ^ (bucketId >>> 32));
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(StateTracker.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
new file mode 100644
index 0000000..745353b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.appdata.query.WindowBoundedService;
+
+/**
+ * Keeps track of time buckets.<br/>
+ *
+ * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
+ * which time-bucket an event falls into and sliding the time boundaries.
+ * <p/>
+ *
+ * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
+ * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
+ * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
+ * <code>rererenceInstant = current-time</code>, then <code>
+ *   numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
+ *
+ * These properties once configured shouldn't be changed because that will result in different time-buckets
+ * for the same (key,time) pair after a failure.
+ * <p/>
+ *
+ * The time boundaries- start and end, periodically move by span of a single time-bucket. Any event with time < start
+ * is expired. These boundaries slide between application window by the expiry task asynchronously.<br/>
+ * The boundaries move only between an application window to ensure consistency of a checkpoint. Checkpoint will happen
+ * at application window boundaries so if we do not restrict moving start and end within an app window boundary, it may
+ * happen that old value of 'start' is saved with the new value of 'end'.
+ *
+ * <p/>
+ *
+ * The boundaries can also be moved by {@link #getTimeBucketFor(long)}. The time which is passed as an argument to this
+ * method can be ahead of <code>end</code>. This means that the corresponding event is a future event
+ * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
+ */
+public class TimeBucketAssigner implements ManagedStateComponent
+{
+  @NotNull
+  private Instant referenceInstant = new Instant();
+
+  @NotNull
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration expireBefore = Duration.standardDays(2);
+
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration bucketSpan;
+
+  private long bucketSpanMillis;
+
+  private long start;
+  private long end;
+  private int numBuckets;
+  private transient long fixedStart;
+  private transient long lowestTimeBucket;
+
+  private boolean initialized;
+
+  private transient WindowBoundedService windowBoundedService;
+
+  private transient PurgeListener purgeListener = null;
+
+  private final transient Runnable expiryTask = new Runnable()
+  {
+    @Override
+    public void run()
+    {
+      synchronized (lock) {
+        start += bucketSpanMillis;
+        end += bucketSpanMillis;
+        if (purgeListener != null) {
+          purgeListener.purgeTimeBucketsLessThanEqualTo(lowestTimeBucket++);
+        }
+      }
+    }
+  };
+
+  private final transient Object lock = new Object();
+
+  @Override
+  public void setup(@NotNull ManagedStateContext managedStateContext)
+  {
+    Context.OperatorContext context = managedStateContext.getOperatorContext();
+    fixedStart = referenceInstant.getMillis() - expireBefore.getMillis();
+
+    if (!initialized) {
+      if (bucketSpan == null) {
+        bucketSpan = Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+            context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS));
+      }
+      start = fixedStart;
+      bucketSpanMillis = bucketSpan.getMillis();
+      numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis);
+      end = start + (numBuckets * bucketSpanMillis);
+
+      initialized = true;
+    }
+    lowestTimeBucket = (start - fixedStart) / bucketSpanMillis;
+    windowBoundedService = new WindowBoundedService(bucketSpanMillis, expiryTask);
+    windowBoundedService.setup(context);
+  }
+
+  public void beginWindow(long windowId)
+  {
+    windowBoundedService.beginWindow(windowId);
+  }
+
+  public void endWindow()
+  {
+    windowBoundedService.endWindow();
+  }
+
+  /**
+   * Get the bucket key for the long value.
+   *
+   * @param value value from which bucket key is derived.
+   * @return -1 if value is already expired; bucket key otherwise.
+   */
+  public long getTimeBucketFor(long value)
+  {
+    synchronized (lock) {
+      if (value < start) {
+        return -1;
+      }
+      long diffFromStart = value - fixedStart;
+      long key = diffFromStart / bucketSpanMillis;
+      if (value > end) {
+        long move = ((value - end) / bucketSpanMillis + 1) * bucketSpanMillis;
+        start += move;
+        end += move;
+      }
+      return key;
+    }
+  }
+
+  public void setPurgeListener(@NotNull PurgeListener purgeListener)
+  {
+    this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
+  }
+
+  @Override
+  public void teardown()
+  {
+    windowBoundedService.teardown();
+  }
+
+  /**
+   * @return number of buckets.
+   */
+  public int getNumBuckets()
+  {
+    return numBuckets;
+  }
+
+  /**
+   * @return reference instant
+   */
+  public Instant getReferenceInstant()
+  {
+    return referenceInstant;
+  }
+
+  /**
+   * Sets the reference instant (by default the system time when the streaming app is created).
+   * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}.
+   *
+   * @param referenceInstant
+   */
+  public void setReferenceInstant(Instant referenceInstant)
+  {
+    this.referenceInstant = referenceInstant;
+  }
+
+  /**
+   * @return duration before which the data is expired.
+   */
+  public Duration getExpireBefore()
+  {
+    return expireBefore;
+  }
+
+  /**
+   * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired.
+   * @param expireBefore duration
+   */
+  public void setExpireBefore(Duration expireBefore)
+  {
+    this.expireBefore = expireBefore;
+  }
+
+  /**
+   * @return time-bucket span
+   */
+  public Duration getBucketSpan()
+  {
+    return bucketSpan;
+  }
+
+  /**
+   * Sets the length of a time bucket.
+   * @param bucketSpan length of time bucket
+   */
+  public void setBucketSpan(Duration bucketSpan)
+  {
+    this.bucketSpan = bucketSpan;
+  }
+
+  /**
+   * The listener is informed when the time slides and time buckets which are older than the smallest time bucket
+   * (changed because of time slide) can be purged.
+   */
+  public interface PurgeListener
+  {
+    void purgeTimeBucketsLessThanEqualTo(long timeBucket);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
new file mode 100644
index 0000000..99c8dd1
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Unstable
+package org.apache.apex.malhar.lib.state.managed;
+
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
new file mode 100644
index 0000000..e75d867
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Unstable
+package org.apache.apex.malhar.lib.state;
+
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
new file mode 100644
index 0000000..51e9a13
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class BucketsFileSystemTest
+{
+  class TestMeta extends TestWatcher
+  {
+    BucketsFileSystem bucketsFileSystem;
+    String applicationPath;
+    MockManagedStateContext managedStateContext;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+
+      managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(7));
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+      managedStateContext.getFileAccess().init();
+
+      bucketsFileSystem = new BucketsFileSystem();
+
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testTransferBucket() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testTransferOfExistingBucket() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+
+    Map<Slice, Bucket.BucketedValue> more = ManagedStateTestUtils.getTestBucketData(50, 100);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, more);
+
+    unsavedBucket0.putAll(more);
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testUpdateBucketMetaDataFile() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    BucketsFileSystem.MutableTimeBucketMeta mutableTbm = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    mutableTbm.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+
+    testMeta.bucketsFileSystem.updateBucketMetaFile(1);
+    BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+    Assert.assertNotNull(immutableTbm);
+    Assert.assertEquals("last transferred window", 10, immutableTbm.getLastTransferredWindowId());
+    Assert.assertEquals("size in bytes", 100, immutableTbm.getSizeInBytes());
+    Assert.assertEquals("first key", "1", immutableTbm.getFirstKey().stringValue());
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testGetTimeBucketMeta() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    BucketsFileSystem.TimeBucketMeta bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+    Assert.assertNull("bucket meta", bucketMeta);
+
+    testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+    Assert.assertNotNull("bucket meta not null", bucketMeta);
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testGetAllTimeBucketMeta() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    BucketsFileSystem.MutableTimeBucketMeta tbm1 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    tbm1.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+
+    BucketsFileSystem.MutableTimeBucketMeta tbm2 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 2);
+    tbm2.updateTimeBucketMeta(10, 100, new Slice("2".getBytes()));
+
+    testMeta.bucketsFileSystem.updateBucketMetaFile(1);
+    TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+        testMeta.bucketsFileSystem.getAllTimeBuckets(1);
+
+    Iterator<BucketsFileSystem.TimeBucketMeta> iterator = timeBucketMetas.iterator();
+    int i = 2;
+    while (iterator.hasNext()) {
+      BucketsFileSystem.TimeBucketMeta tbm = iterator.next();
+      Assert.assertEquals("time bucket " + i, i, tbm.getTimeBucketId());
+      i--;
+    }
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testInvalidateTimeBucket() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    testGetAllTimeBucketMeta();
+    testMeta.bucketsFileSystem.invalidateTimeBucket(1, 1);
+    BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1,1);
+    Assert.assertNull("deleted tbm", immutableTbm);
+
+    TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+        testMeta.bucketsFileSystem.getAllTimeBuckets(1);
+
+    Assert.assertEquals("only 1 tbm", 1, timeBucketMetas.size());
+    immutableTbm = timeBucketMetas.iterator().next();
+
+    Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId());
+    testMeta.bucketsFileSystem.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
new file mode 100644
index 0000000..cb8a97f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class DefaultBucketTest
+{
+
+  class TestMeta extends TestWatcher
+  {
+    Bucket.DefaultBucket defaultBucket;
+    String applicationPath;
+    MockManagedStateContext managedStateContext;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+      managedStateContext.getFileAccess().init();
+
+      defaultBucket = new Bucket.DefaultBucket(1);
+      managedStateContext.getBucketsFileSystem().setup(managedStateContext);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      managedStateContext.getBucketsFileSystem().teardown();
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testPut()
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.defaultBucket.put(one, 1, one);
+
+    Slice value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.MEMORY);
+    Assert.assertEquals("value one", one, value);
+
+    value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.READERS);
+    Assert.assertNull("value not present", value);
+
+    Assert.assertEquals("size of bucket", one.length * 2 + 64, testMeta.defaultBucket.getSizeInBytes());
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testGetFromReader() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+    testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+
+    Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.READERS);
+    Assert.assertEquals("value one", one, value);
+
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testGetFromSpecificTimeBucket() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+    testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+
+    Slice value = testMeta.defaultBucket.get(one, 101, Bucket.ReadSource.READERS);
+    Assert.assertEquals("value one", one, value);
+
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testCheckpointed()
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testPut();
+    Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10);
+    Assert.assertEquals("size", 1, unsaved.size());
+
+    Map.Entry<Slice, Bucket.BucketedValue> entry = unsaved.entrySet().iterator().next();
+    Assert.assertEquals("key", one, entry.getKey());
+    Assert.assertEquals("value", one, entry.getValue().getValue());
+    Assert.assertEquals("time bucket", 1, entry.getValue().getTimeBucket());
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testCommitted()
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testCheckpointed();
+    testMeta.defaultBucket.committed(10);
+    Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.MEMORY);
+    Assert.assertEquals("value one", one, value);
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testCommittedWithOpenReader() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    testGetFromReader();
+    Map<Long, FileAccess.FileReader> readers = testMeta.defaultBucket.getReaders();
+    Assert.assertTrue("reader open", readers.containsKey(101L));
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+    testMeta.defaultBucket.put(two, 101, two);
+    Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10);
+    Assert.assertEquals("size", 1, unsaved.size());
+    testMeta.defaultBucket.committed(10);
+
+    Slice value = testMeta.defaultBucket.get(two, -1, Bucket.ReadSource.MEMORY);
+    Assert.assertEquals("value two", two, value);
+
+    value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.MEMORY);
+    Assert.assertEquals("value one", one, value);
+
+    Assert.assertTrue("reader closed", !readers.containsKey(101L));
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testTeardown() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    testGetFromReader();
+    Map<Long, FileAccess.FileReader> readers = testMeta.defaultBucket.getReaders();
+    Assert.assertTrue("reader open", readers.containsKey(101L));
+
+    testMeta.defaultBucket.teardown();
+    Assert.assertTrue("reader closed", readers.containsKey(101L));
+  }
+
+  @Test
+  public void testFreeMemory() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    testGetFromReader();
+    long initSize = testMeta.defaultBucket.getSizeInBytes();
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.defaultBucket.put(two, 101, two);
+
+    Assert.assertEquals("size", initSize + (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+
+    long sizeFreed = testMeta.defaultBucket.freeMemory();
+    Assert.assertEquals("size freed", initSize, sizeFreed);
+    Assert.assertEquals("existing size", (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+    testMeta.defaultBucket.teardown();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
new file mode 100644
index 0000000..ca64693
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import javax.validation.constraints.NotNull;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class IncrementalCheckpointManagerTest
+{
+  class TestMeta extends TestWatcher
+  {
+    IncrementalCheckpointManager checkpointManager;
+    String applicationPath;
+    int operatorId = 1;
+    MockManagedStateContext managedStateContext;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+      Context.OperatorContext operatorContext = ManagedStateTestUtils.getOperatorContext(operatorId, applicationPath);
+      managedStateContext = new MockManagedStateContext(operatorContext);
+
+      ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+      managedStateContext.getFileAccess().init();
+
+      checkpointManager = new IncrementalCheckpointManager();
+
+      managedStateContext.getTimeBucketAssigner().setup(managedStateContext);
+      managedStateContext.getBucketsFileSystem().setup(managedStateContext);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      managedStateContext.getTimeBucketAssigner().teardown();
+      managedStateContext.getBucketsFileSystem().teardown();
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    IncrementalCheckpointManager deserialized = KryoCloneUtils.cloneObject(testMeta.checkpointManager);
+    Assert.assertNotNull("state window data manager", deserialized);
+  }
+
+  @Test
+  public void testSave() throws IOException
+  {
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+    Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0);
+    testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false);
+    testMeta.checkpointManager.teardown();
+
+    testMeta.checkpointManager = new IncrementalCheckpointManager();
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+    @SuppressWarnings("unchecked")
+    Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5After = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
+        testMeta.checkpointManager.load(testMeta.operatorId, 10);
+
+    Assert.assertEquals("saved", buckets5, buckets5After);
+    testMeta.checkpointManager.teardown();
+  }
+
+  @Test
+  public void testTransferWindowFiles() throws IOException, InterruptedException
+  {
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+
+    Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0);
+    testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false);
+    //Need to synchronously call transfer window files so shutting down the other thread.
+    testMeta.checkpointManager.teardown();
+    Thread.sleep(500);
+
+    testMeta.checkpointManager.committed(testMeta.operatorId, 10);
+    testMeta.checkpointManager.transferWindowFiles();
+
+    for (int i = 0; i < 5; i++) {
+      ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i,
+          buckets5.get((long)i), 1);
+    }
+  }
+
+  @Test
+  public void testCommitted() throws IOException, InterruptedException
+  {
+    CountDownLatch latch = new CountDownLatch(5);
+    MockBucketsFileSystem mockBucketsFileSystem = new MockBucketsFileSystem(latch);
+
+    testMeta.managedStateContext.setBucketsFileSystem(mockBucketsFileSystem);
+
+    mockBucketsFileSystem.setup(testMeta.managedStateContext);
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+
+    Map<Long, Map<Slice, Bucket.BucketedValue>> data = ManagedStateTestUtils.getTestData(0, 5, 0);
+    testMeta.checkpointManager.save(data, testMeta.operatorId, 10, false);
+    testMeta.checkpointManager.committed(testMeta.operatorId, 10);
+    latch.await();
+    testMeta.checkpointManager.teardown();
+    Thread.sleep(500);
+
+    for (int i = 0; i < 5; i++) {
+      ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i, data.get((long)i), 1);
+    }
+  }
+
+  @Test
+  public void testPurge() throws IOException, InterruptedException
+  {
+    FileSystem fileSystem = FileSystem.newInstance(new Configuration());
+
+    testTransferWindowFiles();
+    RemoteIterator<LocatedFileStatus> iterator = fileSystem.listLocatedStatus(
+        new Path(testMeta.applicationPath + "/bucket_data"));
+    Assert.assertTrue(iterator.hasNext());
+
+    testMeta.managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(200);
+
+    iterator = fileSystem.listLocatedStatus(new Path(testMeta.applicationPath + "/bucket_data"));
+    if (iterator.hasNext()) {
+      Assert.fail("All buckets should be deleted");
+    }
+  }
+
+  static class MockBucketsFileSystem extends BucketsFileSystem
+  {
+    private final transient CountDownLatch latch;
+
+    public MockBucketsFileSystem(@NotNull CountDownLatch latch)
+    {
+      super();
+      this.latch = Preconditions.checkNotNull(latch);
+    }
+
+    @Override
+    protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data)
+        throws IOException
+    {
+      super.writeBucketData(windowId, bucketId, data);
+      if (windowId == 10) {
+        latch.countDown();
+      }
+    }
+  }
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManagerTest.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
new file mode 100644
index 0000000..f86b5d3
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedStateImplTest
+{
+
+  class TestMeta extends TestWatcher
+  {
+    ManagedStateImpl managedState;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedState = new ManagedStateImpl();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    ManagedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+    Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+  }
+
+  @Test
+  public void testSimplePutGet()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(0, one, one);
+    Slice value = testMeta.managedState.getSync(0, one);
+    testMeta.managedState.endWindow();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGetFromFlash() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(0, one, one);
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+    Slice value = valFuture.get();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testIncrementalCheckpoint()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(time);
+    testMeta.managedState.put(0, one, one);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time);
+
+    Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+    Assert.assertEquals("value of one", one, defaultBucket.getCheckpointedData().get(time).get(one).getValue());
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.managedState.beginWindow(time + 1);
+    testMeta.managedState.put(0, two, two);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time + 1);
+
+    Assert.assertEquals("value of two", two, defaultBucket.getCheckpointedData().get(time + 1).get(two).getValue());
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGetFromCheckpoint() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(time);
+    testMeta.managedState.put(0, one, one);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time);
+
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+    Assert.assertEquals("value of one", one, valFuture.get());
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testCommitted()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    commitHelper(one, two);
+    Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+    Assert.assertEquals("value of one", one, defaultBucket.getCommittedData().get(one).getValue());
+
+    Assert.assertNull("value of two", defaultBucket.getCommittedData().get(two));
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGetFromCommitted() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    commitHelper(one, two);
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+    Assert.assertEquals("value of one", one, valFuture.get());
+  }
+
+  private void commitHelper(Slice one, Slice two)
+  {
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(time);
+    testMeta.managedState.put(0, one, one);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time);
+
+    testMeta.managedState.beginWindow(time + 1);
+    testMeta.managedState.put(0, two, two);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time + 1);
+
+    testMeta.managedState.committed(time);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
new file mode 100644
index 0000000..f2251bd
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+import org.junit.Assert;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedStateTestUtils
+{
+
+  static void cleanTargetDir(Description description)
+  {
+    try {
+      File out = new File("target/" + description.getClassName());
+      if (out.exists()) {
+        FileUtils.deleteDirectory(out);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue> unsavedBucket,
+      int keysPerTimeBucket) throws IOException
+  {
+    RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId);
+    TreeMap<Slice, Slice> fromDisk = Maps.newTreeMap(new SliceComparator());
+    int size = 0;
+    while (iterator.hasNext()) {
+      LocatedFileStatus fileStatus = iterator.next();
+
+      String timeBucketStr = fileStatus.getPath().getName();
+      if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) {
+        //ignoring meta file
+        continue;
+      }
+
+      LOG.debug("bucket {} time-bucket {}", bucketId, timeBucketStr);
+
+      FileAccess.FileReader reader = fileAccess.getReader(bucketId, timeBucketStr);
+
+      reader.readFully(fromDisk);
+      size += keysPerTimeBucket;
+      Assert.assertEquals("size of bucket " + bucketId, size, fromDisk.size());
+    }
+
+    Assert.assertEquals("size of bucket " + bucketId, unsavedBucket.size(), fromDisk.size());
+
+    Map<Slice, Slice> testBucket = Maps.transformValues(unsavedBucket, new Function<Bucket.BucketedValue, Slice>()
+    {
+      @Override
+      public Slice apply(@Nullable Bucket.BucketedValue input)
+      {
+        assert input != null;
+        return input.getValue();
+      }
+    });
+    Assert.assertEquals("data of bucket" + bucketId, testBucket, fromDisk);
+  }
+
+  static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart)
+  {
+    Map<Long, Map<Slice, Bucket.BucketedValue>> data = Maps.newHashMap();
+    for (int i = startBucket; i < endBucket; i++) {
+      Map<Slice, Bucket.BucketedValue> bucketData = getTestBucketData(keyStart, 100);
+      data.put((long)i, bucketData);
+    }
+    return data;
+  }
+
+  static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart)
+  {
+    Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap();
+    for (int j = 0; j < 5; j++) {
+      Slice keyVal = new Slice(Integer.toString(keyStart).getBytes());
+      bucketData.put(keyVal, new Bucket.BucketedValue(timeBucketStart + j, keyVal));
+      keyStart++;
+    }
+    return bucketData;
+  }
+
+  static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath)
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
+  }
+
+  static Context.OperatorContext getOperatorContext(int operatorId)
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
+  }
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(ManagedStateTestUtils.class);
+
+  static Slice getSliceFor(String x)
+  {
+    return new Slice(x.getBytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
new file mode 100644
index 0000000..ac4db39
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedTimeStateImplTest
+{
+  class TestMeta extends TestWatcher
+  {
+    ManagedTimeStateImpl managedState;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedState = new ManagedTimeStateImpl();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    ManagedTimeStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+    Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+  }
+
+  @Test
+  public void testAsyncGetFromReaders() throws IOException, ExecutionException, InterruptedException
+  {
+    Slice zero = ManagedStateTestUtils.getSliceFor("0");
+    long time = System.currentTimeMillis();
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
+
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero);
+
+    Assert.assertEquals("value of zero", zero, valFuture.get());
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testPutGetWithTime()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(0, time, one, one);
+    Slice value = testMeta.managedState.getSync(0, time, one);
+    testMeta.managedState.endWindow();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGetWithTime() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(0, time, one, one);
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, time, one);
+    Slice value = valFuture.get();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testRecovery() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(0, time, one, one);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(0);
+
+    testMeta.managedState.teardown();
+
+    //there is a failure and the operator is re-deployed.
+    testMeta.managedState.setStateTracker(new StateTracker());
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 0L);
+    Context.OperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+    testMeta.managedState.setup(operatorContext);
+
+    Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+    Assert.assertEquals("value of one", one, defaultBucket.get(one, time, Bucket.ReadSource.MEMORY));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
new file mode 100644
index 0000000..523a10a
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedTimeUnifiedStateImplTest
+{
+  class TestMeta extends TestWatcher
+  {
+    ManagedTimeUnifiedStateImpl managedState;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedState = new ManagedTimeUnifiedStateImpl();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(9, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    ManagedTimeUnifiedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+    Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+  }
+
+  @Test
+  public void testSimplePutGet()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(time, one, one);
+    Slice value = testMeta.managedState.getSync(time, one);
+    testMeta.managedState.endWindow();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGet() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(time, one, one);
+    Future<Slice> valFuture = testMeta.managedState.getAsync(time, one);
+    Slice value = valFuture.get();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
+  {
+    Slice zero = ManagedStateTestUtils.getSliceFor("0");
+    long time = System.currentTimeMillis();
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
+
+    //write data to disk explicitly
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
+        unsavedBucket0, 1);
+
+    Slice value = testMeta.managedState.getSync(time, zero);
+
+    Assert.assertEquals("value of zero", zero, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
+  {
+    Slice zero = ManagedStateTestUtils.getSliceFor("0");
+    long time = System.currentTimeMillis();
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
+
+    //write data to disk explicitly
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
+        unsavedBucket0, 1);
+
+    Future<Slice> valFuture = testMeta.managedState.getAsync(time, zero);
+
+    Assert.assertEquals("value of zero", zero, valFuture.get());
+    testMeta.managedState.teardown();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
new file mode 100644
index 0000000..8ae4db7
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.Comparator;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+class MockManagedStateContext implements ManagedStateContext
+{
+  private TFileImpl.DTFileImpl fileAccess = new TFileImpl.DTFileImpl();
+  private Comparator<Slice> keyComparator = new SliceComparator();
+  private BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
+  private TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+
+  private final Context.OperatorContext operatorContext;
+
+  public MockManagedStateContext(Context.OperatorContext operatorContext)
+  {
+    this.operatorContext = operatorContext;
+  }
+
+  @Override
+  public FileAccess getFileAccess()
+  {
+    return fileAccess;
+  }
+
+  @Override
+  public Comparator<Slice> getKeyComparator()
+  {
+    return keyComparator;
+  }
+
+  public BucketsFileSystem getBucketsFileSystem()
+  {
+    return bucketsFileSystem;
+  }
+
+  @Override
+  public TimeBucketAssigner getTimeBucketAssigner()
+  {
+    return timeBucketAssigner;
+  }
+
+  @Override
+  public Context.OperatorContext getOperatorContext()
+  {
+    return operatorContext;
+  }
+
+  void setFileAccess(TFileImpl.DTFileImpl fileAccess)
+  {
+    this.fileAccess = fileAccess;
+  }
+
+  void setKeyComparator(Comparator<Slice> keyComparator)
+  {
+    this.keyComparator = keyComparator;
+  }
+
+  void setBucketsFileSystem(BucketsFileSystem bucketsFileSystem)
+  {
+    this.bucketsFileSystem = bucketsFileSystem;
+  }
+
+  void setTimeBucketAssigner(TimeBucketAssigner timeBucketAssigner)
+  {
+    this.timeBucketAssigner = timeBucketAssigner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
new file mode 100644
index 0000000..8a3e521
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class StateTrackerTest
+{
+  static class TestMeta extends TestWatcher
+  {
+    MockManagedStateImpl managedState;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedState = new MockManagedStateImpl();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      managedState.setNumBuckets(2);
+      managedState.setMaxMemorySize(100);
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testEviction() throws InterruptedException
+  {
+    testMeta.managedState.latch = new CountDownLatch(1);
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(1, one, one);
+    testMeta.managedState.endWindow();
+
+    testMeta.managedState.latch.await();
+    testMeta.managedState.teardown();
+    Assert.assertEquals("freed bucket", Lists.newArrayList(1L), testMeta.managedState.freedBuckets);
+  }
+
+  @Test
+  public void testMultipleEvictions() throws InterruptedException
+  {
+    testMeta.managedState.latch = new CountDownLatch(2);
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(1, one, one);
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.managedState.put(2, two, two);
+    testMeta.managedState.endWindow();
+
+    testMeta.managedState.latch.await();
+    testMeta.managedState.teardown();
+    Assert.assertEquals("freed bucket", Lists.newArrayList(1L, 2L), testMeta.managedState.freedBuckets);
+  }
+
+  @Test
+  public void testBucketPrevention() throws InterruptedException
+  {
+    testMeta.managedState.setDurationPreventingFreeingSpace(Duration.standardDays(2));
+    testMeta.managedState.setStateTracker(new MockStateTracker());
+    testMeta.managedState.latch = new CountDownLatch(1);
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(1, one, one);
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.managedState.put(2, two, two);
+    testMeta.managedState.endWindow();
+
+    testMeta.managedState.latch.await();
+    testMeta.managedState.teardown();
+    Assert.assertEquals("no buckets triggered", 0, testMeta.managedState.freedBuckets.size());
+  }
+
+  private static class MockManagedStateImpl extends ManagedStateImpl
+  {
+    CountDownLatch latch;
+    List<Long> freedBuckets = Lists.newArrayList();
+
+    @Override
+    protected Bucket newBucket(long bucketId)
+    {
+      return new MockDefaultBucket(bucketId);
+    }
+  }
+
+  private static class MockDefaultBucket extends Bucket.DefaultBucket
+  {
+
+    protected MockDefaultBucket(long bucketId)
+    {
+      super(bucketId);
+    }
+
+    @Override
+    public long freeMemory() throws IOException
+    {
+      long freedBytes = super.freeMemory();
+      ((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId());
+      ((MockManagedStateImpl)managedStateContext).latch.countDown();
+      return freedBytes;
+    }
+
+    @Override
+    public long getSizeInBytes()
+    {
+      return 600;
+    }
+  }
+
+  private static class MockStateTracker extends StateTracker
+  {
+
+    @Override
+    public void run()
+    {
+      super.run();
+      ((MockManagedStateImpl)managedStateImpl).latch.countDown();
+    }
+  }
+
+}