You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/25 14:15:25 UTC

[2/3] incubator-apex-malhar git commit: APEXMALHAR-1897 added managed state

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
new file mode 100644
index 0000000..708cfeb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.TimeSlicedBucketedState;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This implementation of {@link AbstractManagedStateImpl} lets the client to specify the time for each key.
+ * The value of time is used to derive the time-bucket of a key.
+ */
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements TimeSlicedBucketedState
+{
+  public ManagedTimeStateImpl()
+  {
+    this.numBuckets = 1;
+  }
+
+  @Override
+  public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    putInBucket(bucketId, timeBucket, key, value);
+  }
+
+  @Override
+  public Slice getSync(long bucketId, @NotNull Slice key)
+  {
+    return getValueFromBucketSync(bucketId, -1, key);
+  }
+
+  @Override
+  public Slice getSync(long bucketId, long time, @NotNull Slice key)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    if (timeBucket == -1) {
+      //time is expired so no point in looking further.
+      return BucketedState.EXPIRED;
+    }
+    return getValueFromBucketSync(bucketId, timeBucket, key);
+  }
+
+  @Override
+  public Future<Slice> getAsync(long bucketId, Slice key)
+  {
+    return getValueFromBucketAsync(bucketId, -1, key);
+  }
+
+  @Override
+  public Future<Slice> getAsync(long bucketId, long time, Slice key)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    if (timeBucket == -1) {
+      //time is expired so no point in looking further.
+      return Futures.immediateFuture(BucketedState.EXPIRED);
+    }
+    return getValueFromBucketAsync(bucketId, timeBucket, key);
+  }
+
+  @Min(1)
+  @Override
+  public int getNumBuckets()
+  {
+    return numBuckets;
+  }
+
+  /**
+   * Sets the number of buckets.
+   *
+   * @param numBuckets number of buckets
+   */
+  public void setNumBuckets(int numBuckets)
+  {
+    this.numBuckets = numBuckets;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
new file mode 100644
index 0000000..6f531eb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * In this implementation of {@link AbstractManagedStateImpl} the buckets in memory are time-buckets.
+ */
+public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implements BucketedState
+{
+  private final transient LinkedBlockingQueue<Long> purgedTimeBuckets = Queues.newLinkedBlockingQueue();
+  private final transient Set<Bucket> bucketsForTeardown = Sets.newHashSet();
+
+  public ManagedTimeUnifiedStateImpl()
+  {
+    bucketsFileSystem = new TimeUnifiedBucketsFileSystem();
+  }
+
+  @Override
+  public int getNumBuckets()
+  {
+    return timeBucketAssigner.getNumBuckets();
+  }
+
+  @Override
+  public void put(long time, @NotNull Slice key, @NotNull Slice value)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    putInBucket(timeBucket, timeBucket, key, value);
+  }
+
+  @Override
+  public Slice getSync(long time, @NotNull Slice key)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    if (timeBucket == -1) {
+      //time is expired so return expired slice.
+      return BucketedState.EXPIRED;
+    }
+    return getValueFromBucketSync(timeBucket, timeBucket, key);
+  }
+
+  @Override
+  public Future<Slice> getAsync(long time, @NotNull Slice key)
+  {
+    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+    if (timeBucket == -1) {
+      //time is expired so return expired slice.
+      return Futures.immediateFuture(BucketedState.EXPIRED);
+    }
+    return getValueFromBucketAsync(timeBucket, timeBucket, key);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    Long purgedTimeBucket;
+
+    //collect all the purged time buckets
+    while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) {
+      int purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
+      if (buckets[purgedTimeBucketIdx] != null && buckets[purgedTimeBucketIdx].getBucketId() == purgedTimeBucket) {
+        bucketsForTeardown.add(buckets[purgedTimeBucketIdx]);
+        buckets[purgedTimeBucketIdx] = null;
+      }
+    }
+
+    //tear down all the eligible time buckets
+    Iterator<Bucket> bucketIterator = bucketsForTeardown.iterator();
+    while (bucketIterator.hasNext()) {
+      Bucket bucket = bucketIterator.next();
+      if (!tasksPerBucketId.containsKey(bucket.getBucketId())) {
+        //no pending asynchronous queries for this bucket id
+        bucket.teardown();
+        bucketIterator.remove();
+      }
+    }
+  }
+
+  @Override
+  protected void handleBucketConflict(int bucketIdx, long newBucketId)
+  {
+    Preconditions.checkArgument(buckets[bucketIdx].getBucketId() < newBucketId, "new time bucket should have a value"
+        + " greater than the old time bucket");
+    //Time buckets are purged periodically so here a bucket conflict is expected and so we just ignore conflicts.
+    bucketsForTeardown.add(buckets[bucketIdx]);
+    buckets[bucketIdx] = newBucket(newBucketId);
+    buckets[bucketIdx].setup(this);
+  }
+
+  @Override
+  public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+  {
+    purgedTimeBuckets.add(timeBucket);
+    super.purgeTimeBucketsLessThanEqualTo(timeBucket);
+  }
+
+  /**
+   * This uses operator id instead of bucket id as the name of parent folder of time-buckets. This is because
+   * multiple partitions may work on same time-buckets.
+   */
+  private static class TimeUnifiedBucketsFileSystem extends BucketsFileSystem
+  {
+    @Override
+    protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().getWriter(managedStateContext.getOperatorContext().getId(), fileName);
+    }
+
+    @Override
+    protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().getReader(managedStateContext.getOperatorContext().getId(), fileName);
+    }
+
+    @Override
+    protected void rename(long bucketId, String fromName, String toName) throws IOException
+    {
+      managedStateContext.getFileAccess().rename(managedStateContext.getOperatorContext().getId(), fromName, toName);
+    }
+
+    @Override
+    protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().getOutputStream(managedStateContext.getOperatorContext().getId(),
+          fileName);
+    }
+
+    @Override
+    protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().getInputStream(managedStateContext.getOperatorContext().getId(),
+          fileName);
+    }
+
+    @Override
+    protected boolean exists(long bucketId, String fileName) throws IOException
+    {
+      return managedStateContext.getFileAccess().exists(managedStateContext.getOperatorContext().getId(),
+          fileName);
+    }
+
+    @Override
+    protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException
+    {
+      return managedStateContext.getFileAccess().listFiles(managedStateContext.getOperatorContext().getId());
+    }
+
+    @Override
+    protected void delete(long bucketId, String fileName) throws IOException
+    {
+      managedStateContext.getFileAccess().delete(managedStateContext.getOperatorContext().getId(), fileName);
+    }
+
+    @Override
+    protected void deleteBucket(long bucketId) throws IOException
+    {
+      managedStateContext.getFileAccess().deleteBucket(managedStateContext.getOperatorContext().getId());
+    }
+
+    @Override
+    protected void addBucketName(long bucketId)
+    {
+      long operatorId = (long)managedStateContext.getOperatorContext().getId();
+      if (!bucketNamesOnFS.contains(operatorId)) {
+        bucketNamesOnFS.add(operatorId);
+      }
+    }
+  }
+
+  private static transient Logger LOG = LoggerFactory.getLogger(ManagedTimeUnifiedStateImpl.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
new file mode 100644
index 0000000..a359b31
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Tracks the size of state in memory and evicts buckets.
+ */
+class StateTracker extends TimerTask
+{
+  //bucket id -> bucket id & time wrapper
+  private final transient ConcurrentHashMap<Long, BucketIdTimeWrapper> bucketAccessTimes = new ConcurrentHashMap<>();
+
+  private transient ConcurrentSkipListSet<BucketIdTimeWrapper> bucketHeap;
+
+  private final transient Timer memoryFreeService = new Timer();
+
+  protected transient AbstractManagedStateImpl managedStateImpl;
+
+  void setup(@NotNull AbstractManagedStateImpl managedStateImpl)
+  {
+    this.managedStateImpl = Preconditions.checkNotNull(managedStateImpl, "managed state impl");
+
+    this.bucketHeap = new ConcurrentSkipListSet<>(
+        new Comparator<BucketIdTimeWrapper>()
+        {
+          //Note: this comparator imposes orderings that are inconsistent with equals.
+          @Override
+          public int compare(BucketIdTimeWrapper o1, BucketIdTimeWrapper o2)
+          {
+            if (o1.getLastAccessedTime() < o2.getLastAccessedTime()) {
+              return -1;
+            }
+            if (o1.getLastAccessedTime() > o2.getLastAccessedTime()) {
+              return 1;
+            }
+
+            return Long.compare(o1.bucketId, o2.bucketId);
+          }
+        });
+    long intervalMillis = managedStateImpl.getCheckStateSizeInterval().getMillis();
+    memoryFreeService.scheduleAtFixedRate(this, intervalMillis, intervalMillis);
+  }
+
+  void bucketAccessed(long bucketId)
+  {
+    BucketIdTimeWrapper idTimeWrapper = bucketAccessTimes.get(bucketId);
+    if (idTimeWrapper != null) {
+      bucketHeap.remove(idTimeWrapper);
+    }  else {
+      idTimeWrapper = new BucketIdTimeWrapper(bucketId);
+    }
+    idTimeWrapper.setLastAccessedTime(System.currentTimeMillis());
+    bucketHeap.add(idTimeWrapper);
+  }
+
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  @Override
+  public void run()
+  {
+    synchronized (managedStateImpl.commitLock) {
+      //freeing of state needs to be stopped during commit as commit results in transferring data to a state which
+      // can be freed up as well.
+      long bytesSum = 0;
+      for (Bucket bucket : managedStateImpl.buckets) {
+        if (bucket != null) {
+          bytesSum += bucket.getSizeInBytes();
+        }
+      }
+
+      if (bytesSum > managedStateImpl.getMaxMemorySize()) {
+        Duration duration = managedStateImpl.getDurationPreventingFreeingSpace();
+        long durationMillis = 0;
+        if (duration != null) {
+          durationMillis = duration.getMillis();
+        }
+
+        BucketIdTimeWrapper idTimeWrapper;
+        while (bytesSum > managedStateImpl.getMaxMemorySize() && bucketHeap.size() > 0 &&
+            null != (idTimeWrapper = bucketHeap.first())) {
+          //trigger buckets to free space
+
+          if (System.currentTimeMillis() - idTimeWrapper.getLastAccessedTime() < durationMillis) {
+            //if the least recently used bucket cannot free up space because it was accessed within the
+            //specified duration then subsequent buckets cannot free space as well because this heap is ordered by time.
+            break;
+          }
+          long bucketId = idTimeWrapper.bucketId;
+          Bucket bucket = managedStateImpl.getBucket(bucketId);
+          if (bucket != null) {
+
+            synchronized (bucket) {
+              long sizeFreed;
+              try {
+                sizeFreed = bucket.freeMemory();
+                LOG.debug("size freed {} {}", bucketId, sizeFreed);
+              } catch (IOException e) {
+                managedStateImpl.throwable.set(e);
+                throw new RuntimeException("freeing " + bucketId, e);
+              }
+              bytesSum -= sizeFreed;
+            }
+            bucketHeap.remove(idTimeWrapper);
+            bucketAccessTimes.remove(bucketId);
+          }
+        }
+      }
+    }
+  }
+
+  void teardown()
+  {
+    memoryFreeService.cancel();
+  }
+
+  /**
+   * Wrapper class for bucket id and the last time the bucket was accessed.
+   */
+  private static class BucketIdTimeWrapper
+  {
+    private final long bucketId;
+    private long lastAccessedTime;
+
+    BucketIdTimeWrapper(long bucketId)
+    {
+      this.bucketId = bucketId;
+    }
+
+    private synchronized long getLastAccessedTime()
+    {
+      return lastAccessedTime;
+    }
+
+    private synchronized void setLastAccessedTime(long lastAccessedTime)
+    {
+      this.lastAccessedTime = lastAccessedTime;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof BucketIdTimeWrapper)) {
+        return false;
+      }
+
+      BucketIdTimeWrapper that = (BucketIdTimeWrapper)o;
+      //Note: the comparator used with bucket heap imposes orderings that are inconsistent with equals
+      return bucketId == that.bucketId;
+
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return (int)(bucketId ^ (bucketId >>> 32));
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(StateTracker.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
new file mode 100644
index 0000000..745353b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.appdata.query.WindowBoundedService;
+
+/**
+ * Keeps track of time buckets.<br/>
+ *
+ * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
+ * which time-bucket an event falls into and sliding the time boundaries.
+ * <p/>
+ *
+ * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
+ * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
+ * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
+ * <code>rererenceInstant = current-time</code>, then <code>
+ *   numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
+ *
+ * These properties once configured shouldn't be changed because that will result in different time-buckets
+ * for the same (key,time) pair after a failure.
+ * <p/>
+ *
+ * The time boundaries- start and end, periodically move by span of a single time-bucket. Any event with time < start
+ * is expired. These boundaries slide between application window by the expiry task asynchronously.<br/>
+ * The boundaries move only between an application window to ensure consistency of a checkpoint. Checkpoint will happen
+ * at application window boundaries so if we do not restrict moving start and end within an app window boundary, it may
+ * happen that old value of 'start' is saved with the new value of 'end'.
+ *
+ * <p/>
+ *
+ * The boundaries can also be moved by {@link #getTimeBucketFor(long)}. The time which is passed as an argument to this
+ * method can be ahead of <code>end</code>. This means that the corresponding event is a future event
+ * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
+ */
+public class TimeBucketAssigner implements ManagedStateComponent
+{
+  @NotNull
+  private Instant referenceInstant = new Instant();
+
+  @NotNull
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration expireBefore = Duration.standardDays(2);
+
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration bucketSpan;
+
+  private long bucketSpanMillis;
+
+  private long start;
+  private long end;
+  private int numBuckets;
+  private transient long fixedStart;
+  private transient long lowestTimeBucket;
+
+  private boolean initialized;
+
+  private transient WindowBoundedService windowBoundedService;
+
+  private transient PurgeListener purgeListener = null;
+
+  private final transient Runnable expiryTask = new Runnable()
+  {
+    @Override
+    public void run()
+    {
+      synchronized (lock) {
+        start += bucketSpanMillis;
+        end += bucketSpanMillis;
+        if (purgeListener != null) {
+          purgeListener.purgeTimeBucketsLessThanEqualTo(lowestTimeBucket++);
+        }
+      }
+    }
+  };
+
+  private final transient Object lock = new Object();
+
+  @Override
+  public void setup(@NotNull ManagedStateContext managedStateContext)
+  {
+    Context.OperatorContext context = managedStateContext.getOperatorContext();
+    fixedStart = referenceInstant.getMillis() - expireBefore.getMillis();
+
+    if (!initialized) {
+      if (bucketSpan == null) {
+        bucketSpan = Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+            context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS));
+      }
+      start = fixedStart;
+      bucketSpanMillis = bucketSpan.getMillis();
+      numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis);
+      end = start + (numBuckets * bucketSpanMillis);
+
+      initialized = true;
+    }
+    lowestTimeBucket = (start - fixedStart) / bucketSpanMillis;
+    windowBoundedService = new WindowBoundedService(bucketSpanMillis, expiryTask);
+    windowBoundedService.setup(context);
+  }
+
+  public void beginWindow(long windowId)
+  {
+    windowBoundedService.beginWindow(windowId);
+  }
+
+  public void endWindow()
+  {
+    windowBoundedService.endWindow();
+  }
+
+  /**
+   * Get the bucket key for the long value.
+   *
+   * @param value value from which bucket key is derived.
+   * @return -1 if value is already expired; bucket key otherwise.
+   */
+  public long getTimeBucketFor(long value)
+  {
+    synchronized (lock) {
+      if (value < start) {
+        return -1;
+      }
+      long diffFromStart = value - fixedStart;
+      long key = diffFromStart / bucketSpanMillis;
+      if (value > end) {
+        long move = ((value - end) / bucketSpanMillis + 1) * bucketSpanMillis;
+        start += move;
+        end += move;
+      }
+      return key;
+    }
+  }
+
+  public void setPurgeListener(@NotNull PurgeListener purgeListener)
+  {
+    this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
+  }
+
+  @Override
+  public void teardown()
+  {
+    windowBoundedService.teardown();
+  }
+
+  /**
+   * @return number of buckets.
+   */
+  public int getNumBuckets()
+  {
+    return numBuckets;
+  }
+
+  /**
+   * @return reference instant
+   */
+  public Instant getReferenceInstant()
+  {
+    return referenceInstant;
+  }
+
+  /**
+   * Sets the reference instant (by default the system time when the streaming app is created).
+   * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}.
+   *
+   * @param referenceInstant
+   */
+  public void setReferenceInstant(Instant referenceInstant)
+  {
+    this.referenceInstant = referenceInstant;
+  }
+
+  /**
+   * @return duration before which the data is expired.
+   */
+  public Duration getExpireBefore()
+  {
+    return expireBefore;
+  }
+
+  /**
+   * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired.
+   * @param expireBefore duration
+   */
+  public void setExpireBefore(Duration expireBefore)
+  {
+    this.expireBefore = expireBefore;
+  }
+
+  /**
+   * @return time-bucket span
+   */
+  public Duration getBucketSpan()
+  {
+    return bucketSpan;
+  }
+
+  /**
+   * Sets the length of a time bucket.
+   * @param bucketSpan length of time bucket
+   */
+  public void setBucketSpan(Duration bucketSpan)
+  {
+    this.bucketSpan = bucketSpan;
+  }
+
+  /**
+   * The listener is informed when the time slides and time buckets which are older than the smallest time bucket
+   * (changed because of time slide) can be purged.
+   */
+  public interface PurgeListener
+  {
+    void purgeTimeBucketsLessThanEqualTo(long timeBucket);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
new file mode 100644
index 0000000..99c8dd1
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Unstable
+package org.apache.apex.malhar.lib.state.managed;
+
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
new file mode 100644
index 0000000..e75d867
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Unstable
+package org.apache.apex.malhar.lib.state;
+
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
new file mode 100644
index 0000000..51e9a13
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class BucketsFileSystemTest
+{
+  class TestMeta extends TestWatcher
+  {
+    BucketsFileSystem bucketsFileSystem;
+    String applicationPath;
+    MockManagedStateContext managedStateContext;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+
+      managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(7));
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+      managedStateContext.getFileAccess().init();
+
+      bucketsFileSystem = new BucketsFileSystem();
+
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testTransferBucket() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testTransferOfExistingBucket() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+
+    Map<Slice, Bucket.BucketedValue> more = ManagedStateTestUtils.getTestBucketData(50, 100);
+    testMeta.bucketsFileSystem.writeBucketData(10, 0, more);
+
+    unsavedBucket0.putAll(more);
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testUpdateBucketMetaDataFile() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    BucketsFileSystem.MutableTimeBucketMeta mutableTbm = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    mutableTbm.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+
+    testMeta.bucketsFileSystem.updateBucketMetaFile(1);
+    BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+    Assert.assertNotNull(immutableTbm);
+    Assert.assertEquals("last transferred window", 10, immutableTbm.getLastTransferredWindowId());
+    Assert.assertEquals("size in bytes", 100, immutableTbm.getSizeInBytes());
+    Assert.assertEquals("first key", "1", immutableTbm.getFirstKey().stringValue());
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testGetTimeBucketMeta() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    BucketsFileSystem.TimeBucketMeta bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+    Assert.assertNull("bucket meta", bucketMeta);
+
+    testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+    Assert.assertNotNull("bucket meta not null", bucketMeta);
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testGetAllTimeBucketMeta() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    BucketsFileSystem.MutableTimeBucketMeta tbm1 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+    tbm1.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+
+    BucketsFileSystem.MutableTimeBucketMeta tbm2 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 2);
+    tbm2.updateTimeBucketMeta(10, 100, new Slice("2".getBytes()));
+
+    testMeta.bucketsFileSystem.updateBucketMetaFile(1);
+    TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+        testMeta.bucketsFileSystem.getAllTimeBuckets(1);
+
+    Iterator<BucketsFileSystem.TimeBucketMeta> iterator = timeBucketMetas.iterator();
+    int i = 2;
+    while (iterator.hasNext()) {
+      BucketsFileSystem.TimeBucketMeta tbm = iterator.next();
+      Assert.assertEquals("time bucket " + i, i, tbm.getTimeBucketId());
+      i--;
+    }
+    testMeta.bucketsFileSystem.teardown();
+  }
+
+  @Test
+  public void testInvalidateTimeBucket() throws IOException
+  {
+    testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+    testGetAllTimeBucketMeta();
+    testMeta.bucketsFileSystem.invalidateTimeBucket(1, 1);
+    BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1,1);
+    Assert.assertNull("deleted tbm", immutableTbm);
+
+    TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+        testMeta.bucketsFileSystem.getAllTimeBuckets(1);
+
+    Assert.assertEquals("only 1 tbm", 1, timeBucketMetas.size());
+    immutableTbm = timeBucketMetas.iterator().next();
+
+    Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId());
+    testMeta.bucketsFileSystem.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
new file mode 100644
index 0000000..cb8a97f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class DefaultBucketTest
+{
+
+  class TestMeta extends TestWatcher
+  {
+    Bucket.DefaultBucket defaultBucket;
+    String applicationPath;
+    MockManagedStateContext managedStateContext;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+      managedStateContext.getFileAccess().init();
+
+      defaultBucket = new Bucket.DefaultBucket(1);
+      managedStateContext.getBucketsFileSystem().setup(managedStateContext);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      managedStateContext.getBucketsFileSystem().teardown();
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testPut()
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.defaultBucket.put(one, 1, one);
+
+    Slice value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.MEMORY);
+    Assert.assertEquals("value one", one, value);
+
+    value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.READERS);
+    Assert.assertNull("value not present", value);
+
+    Assert.assertEquals("size of bucket", one.length * 2 + 64, testMeta.defaultBucket.getSizeInBytes());
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testGetFromReader() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+    testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+
+    Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.READERS);
+    Assert.assertEquals("value one", one, value);
+
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testGetFromSpecificTimeBucket() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+    testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+
+    Slice value = testMeta.defaultBucket.get(one, 101, Bucket.ReadSource.READERS);
+    Assert.assertEquals("value one", one, value);
+
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testCheckpointed()
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testPut();
+    Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10);
+    Assert.assertEquals("size", 1, unsaved.size());
+
+    Map.Entry<Slice, Bucket.BucketedValue> entry = unsaved.entrySet().iterator().next();
+    Assert.assertEquals("key", one, entry.getKey());
+    Assert.assertEquals("value", one, entry.getValue().getValue());
+    Assert.assertEquals("time bucket", 1, entry.getValue().getTimeBucket());
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testCommitted()
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testCheckpointed();
+    testMeta.defaultBucket.committed(10);
+    Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.MEMORY);
+    Assert.assertEquals("value one", one, value);
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testCommittedWithOpenReader() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    testGetFromReader();
+    Map<Long, FileAccess.FileReader> readers = testMeta.defaultBucket.getReaders();
+    Assert.assertTrue("reader open", readers.containsKey(101L));
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+    testMeta.defaultBucket.put(two, 101, two);
+    Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10);
+    Assert.assertEquals("size", 1, unsaved.size());
+    testMeta.defaultBucket.committed(10);
+
+    Slice value = testMeta.defaultBucket.get(two, -1, Bucket.ReadSource.MEMORY);
+    Assert.assertEquals("value two", two, value);
+
+    value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.MEMORY);
+    Assert.assertEquals("value one", one, value);
+
+    Assert.assertTrue("reader closed", !readers.containsKey(101L));
+    testMeta.defaultBucket.teardown();
+  }
+
+  @Test
+  public void testTeardown() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    testGetFromReader();
+    Map<Long, FileAccess.FileReader> readers = testMeta.defaultBucket.getReaders();
+    Assert.assertTrue("reader open", readers.containsKey(101L));
+
+    testMeta.defaultBucket.teardown();
+    Assert.assertTrue("reader closed", readers.containsKey(101L));
+  }
+
+  @Test
+  public void testFreeMemory() throws IOException
+  {
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    testGetFromReader();
+    long initSize = testMeta.defaultBucket.getSizeInBytes();
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.defaultBucket.put(two, 101, two);
+
+    Assert.assertEquals("size", initSize + (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+
+    long sizeFreed = testMeta.defaultBucket.freeMemory();
+    Assert.assertEquals("size freed", initSize, sizeFreed);
+    Assert.assertEquals("existing size", (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+    testMeta.defaultBucket.teardown();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
new file mode 100644
index 0000000..ca64693
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import javax.validation.constraints.NotNull;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class IncrementalCheckpointManagerTest
+{
+  class TestMeta extends TestWatcher
+  {
+    IncrementalCheckpointManager checkpointManager;
+    String applicationPath;
+    int operatorId = 1;
+    MockManagedStateContext managedStateContext;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+      Context.OperatorContext operatorContext = ManagedStateTestUtils.getOperatorContext(operatorId, applicationPath);
+      managedStateContext = new MockManagedStateContext(operatorContext);
+
+      ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+      managedStateContext.getFileAccess().init();
+
+      checkpointManager = new IncrementalCheckpointManager();
+
+      managedStateContext.getTimeBucketAssigner().setup(managedStateContext);
+      managedStateContext.getBucketsFileSystem().setup(managedStateContext);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      managedStateContext.getTimeBucketAssigner().teardown();
+      managedStateContext.getBucketsFileSystem().teardown();
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    IncrementalCheckpointManager deserialized = KryoCloneUtils.cloneObject(testMeta.checkpointManager);
+    Assert.assertNotNull("state window data manager", deserialized);
+  }
+
+  @Test
+  public void testSave() throws IOException
+  {
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+    Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0);
+    testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false);
+    testMeta.checkpointManager.teardown();
+
+    testMeta.checkpointManager = new IncrementalCheckpointManager();
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+    @SuppressWarnings("unchecked")
+    Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5After = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
+        testMeta.checkpointManager.load(testMeta.operatorId, 10);
+
+    Assert.assertEquals("saved", buckets5, buckets5After);
+    testMeta.checkpointManager.teardown();
+  }
+
+  @Test
+  public void testTransferWindowFiles() throws IOException, InterruptedException
+  {
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+
+    Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0);
+    testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false);
+    //Need to synchronously call transfer window files so shutting down the other thread.
+    testMeta.checkpointManager.teardown();
+    Thread.sleep(500);
+
+    testMeta.checkpointManager.committed(testMeta.operatorId, 10);
+    testMeta.checkpointManager.transferWindowFiles();
+
+    for (int i = 0; i < 5; i++) {
+      ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i,
+          buckets5.get((long)i), 1);
+    }
+  }
+
+  @Test
+  public void testCommitted() throws IOException, InterruptedException
+  {
+    CountDownLatch latch = new CountDownLatch(5);
+    MockBucketsFileSystem mockBucketsFileSystem = new MockBucketsFileSystem(latch);
+
+    testMeta.managedStateContext.setBucketsFileSystem(mockBucketsFileSystem);
+
+    mockBucketsFileSystem.setup(testMeta.managedStateContext);
+    testMeta.checkpointManager.setup(testMeta.managedStateContext);
+
+    Map<Long, Map<Slice, Bucket.BucketedValue>> data = ManagedStateTestUtils.getTestData(0, 5, 0);
+    testMeta.checkpointManager.save(data, testMeta.operatorId, 10, false);
+    testMeta.checkpointManager.committed(testMeta.operatorId, 10);
+    latch.await();
+    testMeta.checkpointManager.teardown();
+    Thread.sleep(500);
+
+    for (int i = 0; i < 5; i++) {
+      ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i, data.get((long)i), 1);
+    }
+  }
+
+  @Test
+  public void testPurge() throws IOException, InterruptedException
+  {
+    FileSystem fileSystem = FileSystem.newInstance(new Configuration());
+
+    testTransferWindowFiles();
+    RemoteIterator<LocatedFileStatus> iterator = fileSystem.listLocatedStatus(
+        new Path(testMeta.applicationPath + "/bucket_data"));
+    Assert.assertTrue(iterator.hasNext());
+
+    testMeta.managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(200);
+
+    iterator = fileSystem.listLocatedStatus(new Path(testMeta.applicationPath + "/bucket_data"));
+    if (iterator.hasNext()) {
+      Assert.fail("All buckets should be deleted");
+    }
+  }
+
+  static class MockBucketsFileSystem extends BucketsFileSystem
+  {
+    private final transient CountDownLatch latch;
+
+    public MockBucketsFileSystem(@NotNull CountDownLatch latch)
+    {
+      super();
+      this.latch = Preconditions.checkNotNull(latch);
+    }
+
+    @Override
+    protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data)
+        throws IOException
+    {
+      super.writeBucketData(windowId, bucketId, data);
+      if (windowId == 10) {
+        latch.countDown();
+      }
+    }
+  }
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManagerTest.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
new file mode 100644
index 0000000..f86b5d3
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedStateImplTest
+{
+
+  class TestMeta extends TestWatcher
+  {
+    ManagedStateImpl managedState;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedState = new ManagedStateImpl();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    ManagedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+    Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+  }
+
+  @Test
+  public void testSimplePutGet()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(0, one, one);
+    Slice value = testMeta.managedState.getSync(0, one);
+    testMeta.managedState.endWindow();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGetFromFlash() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(0, one, one);
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+    Slice value = valFuture.get();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testIncrementalCheckpoint()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(time);
+    testMeta.managedState.put(0, one, one);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time);
+
+    Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+    Assert.assertEquals("value of one", one, defaultBucket.getCheckpointedData().get(time).get(one).getValue());
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.managedState.beginWindow(time + 1);
+    testMeta.managedState.put(0, two, two);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time + 1);
+
+    Assert.assertEquals("value of two", two, defaultBucket.getCheckpointedData().get(time + 1).get(two).getValue());
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGetFromCheckpoint() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(time);
+    testMeta.managedState.put(0, one, one);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time);
+
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+    Assert.assertEquals("value of one", one, valFuture.get());
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testCommitted()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    commitHelper(one, two);
+    Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+    Assert.assertEquals("value of one", one, defaultBucket.getCommittedData().get(one).getValue());
+
+    Assert.assertNull("value of two", defaultBucket.getCommittedData().get(two));
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGetFromCommitted() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    commitHelper(one, two);
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+    Assert.assertEquals("value of one", one, valFuture.get());
+  }
+
+  private void commitHelper(Slice one, Slice two)
+  {
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(time);
+    testMeta.managedState.put(0, one, one);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time);
+
+    testMeta.managedState.beginWindow(time + 1);
+    testMeta.managedState.put(0, two, two);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(time + 1);
+
+    testMeta.managedState.committed(time);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
new file mode 100644
index 0000000..f2251bd
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+import org.junit.Assert;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedStateTestUtils
+{
+
+  static void cleanTargetDir(Description description)
+  {
+    try {
+      File out = new File("target/" + description.getClassName());
+      if (out.exists()) {
+        FileUtils.deleteDirectory(out);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue> unsavedBucket,
+      int keysPerTimeBucket) throws IOException
+  {
+    RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId);
+    TreeMap<Slice, Slice> fromDisk = Maps.newTreeMap(new SliceComparator());
+    int size = 0;
+    while (iterator.hasNext()) {
+      LocatedFileStatus fileStatus = iterator.next();
+
+      String timeBucketStr = fileStatus.getPath().getName();
+      if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) {
+        //ignoring meta file
+        continue;
+      }
+
+      LOG.debug("bucket {} time-bucket {}", bucketId, timeBucketStr);
+
+      FileAccess.FileReader reader = fileAccess.getReader(bucketId, timeBucketStr);
+
+      reader.readFully(fromDisk);
+      size += keysPerTimeBucket;
+      Assert.assertEquals("size of bucket " + bucketId, size, fromDisk.size());
+    }
+
+    Assert.assertEquals("size of bucket " + bucketId, unsavedBucket.size(), fromDisk.size());
+
+    Map<Slice, Slice> testBucket = Maps.transformValues(unsavedBucket, new Function<Bucket.BucketedValue, Slice>()
+    {
+      @Override
+      public Slice apply(@Nullable Bucket.BucketedValue input)
+      {
+        assert input != null;
+        return input.getValue();
+      }
+    });
+    Assert.assertEquals("data of bucket" + bucketId, testBucket, fromDisk);
+  }
+
+  static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart)
+  {
+    Map<Long, Map<Slice, Bucket.BucketedValue>> data = Maps.newHashMap();
+    for (int i = startBucket; i < endBucket; i++) {
+      Map<Slice, Bucket.BucketedValue> bucketData = getTestBucketData(keyStart, 100);
+      data.put((long)i, bucketData);
+    }
+    return data;
+  }
+
+  static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart)
+  {
+    Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap();
+    for (int j = 0; j < 5; j++) {
+      Slice keyVal = new Slice(Integer.toString(keyStart).getBytes());
+      bucketData.put(keyVal, new Bucket.BucketedValue(timeBucketStart + j, keyVal));
+      keyStart++;
+    }
+    return bucketData;
+  }
+
+  static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath)
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
+  }
+
+  static Context.OperatorContext getOperatorContext(int operatorId)
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
+  }
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(ManagedStateTestUtils.class);
+
+  static Slice getSliceFor(String x)
+  {
+    return new Slice(x.getBytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
new file mode 100644
index 0000000..ac4db39
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedTimeStateImplTest
+{
+  class TestMeta extends TestWatcher
+  {
+    ManagedTimeStateImpl managedState;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedState = new ManagedTimeStateImpl();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    ManagedTimeStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+    Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+  }
+
+  @Test
+  public void testAsyncGetFromReaders() throws IOException, ExecutionException, InterruptedException
+  {
+    Slice zero = ManagedStateTestUtils.getSliceFor("0");
+    long time = System.currentTimeMillis();
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
+
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero);
+
+    Assert.assertEquals("value of zero", zero, valFuture.get());
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testPutGetWithTime()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(0, time, one, one);
+    Slice value = testMeta.managedState.getSync(0, time, one);
+    testMeta.managedState.endWindow();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGetWithTime() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(0, time, one, one);
+    Future<Slice> valFuture = testMeta.managedState.getAsync(0, time, one);
+    Slice value = valFuture.get();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testRecovery() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(0, time, one, one);
+    testMeta.managedState.endWindow();
+    testMeta.managedState.beforeCheckpoint(0);
+
+    testMeta.managedState.teardown();
+
+    //there is a failure and the operator is re-deployed.
+    testMeta.managedState.setStateTracker(new StateTracker());
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 0L);
+    Context.OperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+    testMeta.managedState.setup(operatorContext);
+
+    Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+    Assert.assertEquals("value of one", one, defaultBucket.get(one, time, Bucket.ReadSource.MEMORY));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
new file mode 100644
index 0000000..523a10a
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedTimeUnifiedStateImplTest
+{
+  class TestMeta extends TestWatcher
+  {
+    ManagedTimeUnifiedStateImpl managedState;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedState = new ManagedTimeUnifiedStateImpl();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(9, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    ManagedTimeUnifiedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+    Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+  }
+
+  @Test
+  public void testSimplePutGet()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(time, one, one);
+    Slice value = testMeta.managedState.getSync(time, one);
+    testMeta.managedState.endWindow();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncGet() throws ExecutionException, InterruptedException
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(time, one, one);
+    Future<Slice> valFuture = testMeta.managedState.getAsync(time, one);
+    Slice value = valFuture.get();
+
+    Assert.assertEquals("value of one", one, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
+  {
+    Slice zero = ManagedStateTestUtils.getSliceFor("0");
+    long time = System.currentTimeMillis();
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
+
+    //write data to disk explicitly
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
+        unsavedBucket0, 1);
+
+    Slice value = testMeta.managedState.getSync(time, zero);
+
+    Assert.assertEquals("value of zero", zero, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
+  public void testAsyncSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
+  {
+    Slice zero = ManagedStateTestUtils.getSliceFor("0");
+    long time = System.currentTimeMillis();
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+    Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
+
+    //write data to disk explicitly
+    testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+    ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
+        unsavedBucket0, 1);
+
+    Future<Slice> valFuture = testMeta.managedState.getAsync(time, zero);
+
+    Assert.assertEquals("value of zero", zero, valFuture.get());
+    testMeta.managedState.teardown();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
new file mode 100644
index 0000000..8ae4db7
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.Comparator;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+class MockManagedStateContext implements ManagedStateContext
+{
+  private TFileImpl.DTFileImpl fileAccess = new TFileImpl.DTFileImpl();
+  private Comparator<Slice> keyComparator = new SliceComparator();
+  private BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
+  private TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+
+  private final Context.OperatorContext operatorContext;
+
+  public MockManagedStateContext(Context.OperatorContext operatorContext)
+  {
+    this.operatorContext = operatorContext;
+  }
+
+  @Override
+  public FileAccess getFileAccess()
+  {
+    return fileAccess;
+  }
+
+  @Override
+  public Comparator<Slice> getKeyComparator()
+  {
+    return keyComparator;
+  }
+
+  public BucketsFileSystem getBucketsFileSystem()
+  {
+    return bucketsFileSystem;
+  }
+
+  @Override
+  public TimeBucketAssigner getTimeBucketAssigner()
+  {
+    return timeBucketAssigner;
+  }
+
+  @Override
+  public Context.OperatorContext getOperatorContext()
+  {
+    return operatorContext;
+  }
+
+  void setFileAccess(TFileImpl.DTFileImpl fileAccess)
+  {
+    this.fileAccess = fileAccess;
+  }
+
+  void setKeyComparator(Comparator<Slice> keyComparator)
+  {
+    this.keyComparator = keyComparator;
+  }
+
+  void setBucketsFileSystem(BucketsFileSystem bucketsFileSystem)
+  {
+    this.bucketsFileSystem = bucketsFileSystem;
+  }
+
+  void setTimeBucketAssigner(TimeBucketAssigner timeBucketAssigner)
+  {
+    this.timeBucketAssigner = timeBucketAssigner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
new file mode 100644
index 0000000..8a3e521
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class StateTrackerTest
+{
+  static class TestMeta extends TestWatcher
+  {
+    MockManagedStateImpl managedState;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+      managedState = new MockManagedStateImpl();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      managedState.setNumBuckets(2);
+      managedState.setMaxMemorySize(100);
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      ManagedStateTestUtils.cleanTargetDir(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testEviction() throws InterruptedException
+  {
+    testMeta.managedState.latch = new CountDownLatch(1);
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(1, one, one);
+    testMeta.managedState.endWindow();
+
+    testMeta.managedState.latch.await();
+    testMeta.managedState.teardown();
+    Assert.assertEquals("freed bucket", Lists.newArrayList(1L), testMeta.managedState.freedBuckets);
+  }
+
+  @Test
+  public void testMultipleEvictions() throws InterruptedException
+  {
+    testMeta.managedState.latch = new CountDownLatch(2);
+    testMeta.managedState.setup(testMeta.operatorContext);
+
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(1, one, one);
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.managedState.put(2, two, two);
+    testMeta.managedState.endWindow();
+
+    testMeta.managedState.latch.await();
+    testMeta.managedState.teardown();
+    Assert.assertEquals("freed bucket", Lists.newArrayList(1L, 2L), testMeta.managedState.freedBuckets);
+  }
+
+  @Test
+  public void testBucketPrevention() throws InterruptedException
+  {
+    testMeta.managedState.setDurationPreventingFreeingSpace(Duration.standardDays(2));
+    testMeta.managedState.setStateTracker(new MockStateTracker());
+    testMeta.managedState.latch = new CountDownLatch(1);
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+    testMeta.managedState.beginWindow(System.currentTimeMillis());
+    testMeta.managedState.put(1, one, one);
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.managedState.put(2, two, two);
+    testMeta.managedState.endWindow();
+
+    testMeta.managedState.latch.await();
+    testMeta.managedState.teardown();
+    Assert.assertEquals("no buckets triggered", 0, testMeta.managedState.freedBuckets.size());
+  }
+
+  private static class MockManagedStateImpl extends ManagedStateImpl
+  {
+    CountDownLatch latch;
+    List<Long> freedBuckets = Lists.newArrayList();
+
+    @Override
+    protected Bucket newBucket(long bucketId)
+    {
+      return new MockDefaultBucket(bucketId);
+    }
+  }
+
+  private static class MockDefaultBucket extends Bucket.DefaultBucket
+  {
+
+    protected MockDefaultBucket(long bucketId)
+    {
+      super(bucketId);
+    }
+
+    @Override
+    public long freeMemory() throws IOException
+    {
+      long freedBytes = super.freeMemory();
+      ((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId());
+      ((MockManagedStateImpl)managedStateContext).latch.countDown();
+      return freedBytes;
+    }
+
+    @Override
+    public long getSizeInBytes()
+    {
+      return 600;
+    }
+  }
+
+  private static class MockStateTracker extends StateTracker
+  {
+
+    @Override
+    public void run()
+    {
+      super.run();
+      ((MockManagedStateImpl)managedStateImpl).latch.countDown();
+    }
+  }
+
+}