You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/16 04:46:05 UTC

apex-malhar git commit: APEXMALHAR-2301 Refactor timebucketassigner to add a single timebucket assigner and change the timebucket metadata from array to map to handle unbounded time buckets

Repository: apex-malhar
Updated Branches:
  refs/heads/master 52510b0f8 -> 4ab457f18


APEXMALHAR-2301 Refactor timebucketassigner to add a single timebucket assigner and change the timebucket metadata from array to map to handle unbounded time buckets

This closes #503


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4ab457f1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4ab457f1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4ab457f1

Branch: refs/heads/master
Commit: 4ab457f18a8af8892d8f3f4a5f6e6c2eb50995ac
Parents: 52510b0
Author: Siyuan Hua <hs...@apache.org>
Authored: Sun Nov 20 22:01:59 2016 -0800
Committer: David Yan <da...@apache.org>
Committed: Sun Jan 15 20:44:06 2017 -0800

----------------------------------------------------------------------
 .../apex/malhar/lib/dedup/AbstractDeduper.java  |   4 +-
 .../malhar/lib/dedup/BoundedDedupOperator.java  |   4 +-
 .../lib/dedup/TimeBasedDedupOperator.java       |   6 +-
 .../AbstractManagedStateInnerJoinOperator.java  |   8 +-
 .../state/managed/AbstractManagedStateImpl.java |  64 ++++---
 .../lib/state/managed/ManagedStateImpl.java     |   4 +-
 .../lib/state/managed/ManagedTimeStateImpl.java |  15 +-
 .../managed/ManagedTimeStateMultiValue.java     |   8 +-
 .../managed/ManagedTimeUnifiedStateImpl.java    |  38 ++--
 .../MovingBoundaryTimeBucketAssigner.java       | 180 +++++++++++++++++++
 .../malhar/lib/state/managed/StateTracker.java  |   2 +-
 .../lib/state/managed/TimeBucketAssigner.java   | 167 ++++-------------
 .../malhar/lib/state/managed/TimeExtractor.java |  23 +++
 .../managed/UnboundedTimeBucketAssigner.java    |  70 ++++++++
 .../spillable/SpillableSetMultimapImpl.java     |  27 +--
 .../impl/SpillableSessionWindowedStorage.java   |   6 +-
 .../ManagedTimeUnifiedStateImplTest.java        |  12 +-
 .../state/managed/MockManagedStateContext.java  |   6 +-
 .../MovingBoundaryTimeBucketAssignerTest.java   | 116 ++++++++++++
 .../state/managed/TimeBucketAssignerTest.java   | 116 ------------
 .../malhar/lib/window/WindowedOperatorTest.java |   3 +-
 21 files changed, 526 insertions(+), 353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
index c73cea7..bf81fde 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.apex.malhar.lib.state.BucketedState;
 import org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl;
 import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
-import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.Path;
 
@@ -54,7 +54,7 @@ import com.datatorrent.netlet.util.Slice;
 
 /**
  * Abstract class which allows de-duplicating incoming tuples based on a configured key.
- * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner}
+ * Also supports expiry mechanism based on a configurable expiry period configured using {@link MovingBoundaryTimeBucketAssigner}
  * in {@link ManagedTimeUnifiedStateImpl}
  * Following steps are used in identifying the state of a particular tuple:
  * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
index 7763103..e2a1297 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Future;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
-import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
 
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
@@ -110,7 +110,7 @@ public class BoundedDedupOperator extends AbstractDeduper<Object>
       numBuckets = DEFAULT_NUM_BUCKETS;
     }
     ((ManagedTimeStateImpl)managedState).setNumBuckets(numBuckets);
-    TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+    MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
     managedState.setTimeBucketAssigner(timeBucketAssigner);
     super.setup(context);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
index 3f888cc..3b5f5e2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
@@ -25,7 +25,7 @@ import javax.validation.constraints.NotNull;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
-import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
 import com.datatorrent.api.Context;
@@ -64,7 +64,7 @@ import com.datatorrent.netlet.util.Slice;
  * 3. {@link #referenceInstant} - The reference point from which to start the time which is used for expiry.
  * Setting the {@link #referenceInstant} to say, r seconds from the epoch, would initialize the start of expiry
  * to be from that instant = r. The start and end of the expiry window periodically move by the span of a single
- * bucket. Refer {@link TimeBucketAssigner} for details.
+ * bucket. Refer {@link MovingBoundaryTimeBucketAssigner} for details.
  *
  * Additionally, it also needs the following parameters:
  * 1. {@link #keyExpression} - The java expression to extract the key fields in the incoming tuple (POJO)
@@ -151,7 +151,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A
   @Override
   public void setup(OperatorContext context)
   {
-    TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+    MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
     timeBucketAssigner.setBucketSpan(Duration.standardSeconds(bucketSpan));
     timeBucketAssigner.setExpireBefore(Duration.standardSeconds(expireBefore));
     timeBucketAssigner.setReferenceInstant(new Instant(referenceInstant * 1000));

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
index c82c3e3..453da80 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Future;
 import org.joda.time.Duration;
 import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
 import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.hadoop.fs.Path;
 import com.google.common.collect.Maps;
@@ -68,12 +69,13 @@ public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends Abstrac
     stream2Store = new ManagedTimeStateImpl();
     stream1Store.setNumBuckets(noOfBuckets);
     stream2Store.setNumBuckets(noOfBuckets);
+    assert stream1Store.getTimeBucketAssigner() == stream2Store.getTimeBucketAssigner();
     if (bucketSpanTime != null) {
       stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
-      stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
     }
-    stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
-    stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+    if (stream1Store.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner) {
+      ((MovingBoundaryTimeBucketAssigner)stream1Store.getTimeBucketAssigner()).setExpireBefore(Duration.millis(getExpiryTime()));
+    }
 
     stream1Data = new ManagedTimeStateMultiValue(stream1Store, !isLeftKeyPrimary());
     stream2Data = new ManagedTimeStateMultiValue(stream2Store, !isRightKeyPrimary());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 364bc19..daae2d8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -20,6 +20,7 @@ package org.apache.apex.malhar.lib.state.managed;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -128,14 +129,14 @@ public abstract class AbstractManagedStateImpl
 {
   private long maxMemorySize;
 
-  protected int numBuckets;
+  protected long numBuckets;
 
   @NotNull
   private FileAccess fileAccess = new TFileImpl.DTFileImpl();
   @NotNull
-  protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+  protected TimeBucketAssigner timeBucketAssigner;
 
-  protected Bucket[] buckets;
+  protected Map<Long, Bucket> buckets;
 
   @Min(1)
   private int numReaders = 1;
@@ -176,6 +177,11 @@ public abstract class AbstractManagedStateImpl
     operatorContext = context;
     fileAccess.init();
 
+    if (timeBucketAssigner == null) {
+      // set default time bucket assigner
+      MovingBoundaryTimeBucketAssigner movingBoundaryTimeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
+      setTimeBucketAssigner(movingBoundaryTimeBucketAssigner);
+    }
     timeBucketAssigner.setPurgeListener(this);
 
     //setup all the managed state components
@@ -184,11 +190,11 @@ public abstract class AbstractManagedStateImpl
     bucketsFileSystem.setup(this);
 
     if (buckets == null) {
-      //create buckets array only once at start when it is not created.
+      //create buckets map only once at start if it is not created.
       numBuckets = getNumBuckets();
-      buckets = new Bucket[numBuckets];
+      buckets = new HashMap<>();
     }
-    for (Bucket bucket : buckets) {
+    for (Bucket bucket : buckets.values()) {
       if (bucket != null) {
         bucket.setup(this);
       }
@@ -210,8 +216,8 @@ public abstract class AbstractManagedStateImpl
               stateEntry.getValue();
           if (state != null && !state.isEmpty()) {
             for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> bucketEntry : state.entrySet()) {
-              int bucketIdx = prepareBucket(bucketEntry.getKey());
-              buckets[bucketIdx].recoveredData(stateEntry.getKey(), bucketEntry.getValue());
+              long bucketIdx = prepareBucket(bucketEntry.getKey());
+              buckets.get(bucketIdx).recoveredData(stateEntry.getKey(), bucketEntry.getValue());
             }
           }
           checkpointManager.save(state, stateEntry.getKey(), true /*skipWritingToWindowFile*/);
@@ -231,7 +237,7 @@ public abstract class AbstractManagedStateImpl
    *
    * @return number of buckets.
    */
-  public abstract int getNumBuckets();
+  public abstract long getNumBuckets();
 
   public void beginWindow(long windowId)
   {
@@ -246,17 +252,17 @@ public abstract class AbstractManagedStateImpl
    * @param bucketId bucket key
    * @return bucket index
    */
-  protected int prepareBucket(long bucketId)
+  protected long prepareBucket(long bucketId)
   {
     stateTracker.bucketAccessed(bucketId);
-    int bucketIdx = getBucketIdx(bucketId);
+    long bucketIdx = getBucketIdx(bucketId);
 
-    Bucket bucket = buckets[bucketIdx];
+    Bucket bucket = buckets.get(bucketIdx);
     if (bucket == null) {
       //bucket is not in memory
       bucket = newBucket(bucketId);
       bucket.setup(this);
-      buckets[bucketIdx] = bucket;
+      buckets.put(bucketIdx, bucket);
     } else if (bucket.getBucketId() != bucketId) {
       handleBucketConflict(bucketIdx, bucketId);
     }
@@ -269,13 +275,13 @@ public abstract class AbstractManagedStateImpl
     Preconditions.checkNotNull(value, "value");
     if (timeBucket != -1) {
       //time bucket is invalid data is not stored
-      int bucketIdx = prepareBucket(bucketId);
+      long bucketIdx = prepareBucket(bucketId);
       //synchronization on a bucket isn't required for put because the event is added to flash which is
       //a concurrent map. The assumption here is that the calls to put & get(sync/async) are being made synchronously by
       //a single thread (operator thread). The get(sync/async) always checks memory first synchronously.
       //If the key is not in the memory, then the async get will uses other reader threads which will fetch it from
       //the files.
-      buckets[bucketIdx].put(key, timeBucket, value);
+      buckets.get(bucketIdx).put(key, timeBucket, value);
     }
   }
 
@@ -283,8 +289,8 @@ public abstract class AbstractManagedStateImpl
   protected Slice getValueFromBucketSync(long bucketId, long timeBucket, @NotNull Slice key)
   {
     Preconditions.checkNotNull(key, "key");
-    int bucketIdx = prepareBucket(bucketId);
-    Bucket bucket = buckets[bucketIdx];
+    long bucketIdx = prepareBucket(bucketId);
+    Bucket bucket = buckets.get(bucketIdx);
     synchronized (bucket) {
       return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
     }
@@ -294,8 +300,8 @@ public abstract class AbstractManagedStateImpl
   protected Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket, @NotNull Slice key)
   {
     Preconditions.checkNotNull(key, "key");
-    int bucketIdx = prepareBucket(bucketId);
-    Bucket bucket = buckets[bucketIdx];
+    long bucketIdx = prepareBucket(bucketId);
+    Bucket bucket = buckets.get(bucketIdx);
     synchronized (bucket) {
       Slice cachedVal = bucket.get(key, timeBucket, Bucket.ReadSource.MEMORY);
       if (cachedVal != null) {
@@ -307,20 +313,20 @@ public abstract class AbstractManagedStateImpl
     }
   }
 
-  protected void handleBucketConflict(int bucketIdx, long newBucketId)
+  protected void handleBucketConflict(long bucketIdx, long newBucketId)
   {
-    throw new IllegalArgumentException("bucket conflict " + buckets[bucketIdx].getBucketId() + " " + newBucketId);
+    throw new IllegalArgumentException("bucket conflict " + buckets.get(bucketIdx).getBucketId() + " " + newBucketId);
   }
 
-  protected int getBucketIdx(long bucketId)
+  protected long getBucketIdx(long bucketId)
   {
-    return (int)Math.abs(bucketId % numBuckets);
+    return Math.abs(bucketId % numBuckets);
   }
 
   @Override
   public Bucket getBucket(long bucketId)
   {
-    return buckets[getBucketIdx(bucketId)];
+    return buckets.get(getBucketIdx(bucketId));
   }
 
   @Override
@@ -330,7 +336,7 @@ public abstract class AbstractManagedStateImpl
     if (b == null) {
       b = newBucket(bucketId);
       b.setup(this);
-      buckets[getBucketIdx(bucketId)] = b;
+      buckets.put(getBucketIdx(bucketId), b);
     }
     return b;
   }
@@ -351,7 +357,7 @@ public abstract class AbstractManagedStateImpl
   {
     Map<Long, Map<Slice, Bucket.BucketedValue>> flashData = Maps.newHashMap();
 
-    for (Bucket bucket : buckets) {
+    for (Bucket bucket : buckets.values()) {
       if (bucket != null) {
         synchronized (bucket) {
           Map<Slice, Bucket.BucketedValue> flashDataForBucket = bucket.checkpoint(windowId);
@@ -381,7 +387,7 @@ public abstract class AbstractManagedStateImpl
   {
     synchronized (commitLock) {
       try {
-        for (Bucket bucket : buckets) {
+        for (Bucket bucket : buckets.values()) {
           if (bucket != null) {
             synchronized (bucket) {
               bucket.committed(windowId);
@@ -402,7 +408,7 @@ public abstract class AbstractManagedStateImpl
   public Map<Long, Long> getBucketMemoryUsage()
   {
     Map<Long, Long> bucketToSize = Maps.newHashMap();
-    for (Bucket bucket : buckets) {
+    for (Bucket bucket : buckets.values()) {
       if (bucket == null) {
         continue;
       }
@@ -419,7 +425,7 @@ public abstract class AbstractManagedStateImpl
     bucketsFileSystem.teardown();
     timeBucketAssigner.teardown();
     readerService.shutdownNow();
-    for (Bucket bucket : buckets) {
+    for (Bucket bucket : buckets.values()) {
       if (bucket != null) {
         synchronized (bucket) {
           bucket.teardown();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
index ba8cdc6..bf2209f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
@@ -55,7 +55,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket
   @Override
   public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = timeBucketAssigner.getTimeBucket(time);
     putInBucket(bucketId, timeBucket, key, value);
   }
 
@@ -88,7 +88,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket
 
   @Min(1)
   @Override
-  public int getNumBuckets()
+  public long getNumBuckets()
   {
     return numBuckets;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
index eddc736..caa5e2b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
@@ -28,6 +28,7 @@ import org.apache.apex.malhar.lib.state.TimeSlicedBucketedState;
 
 import com.google.common.util.concurrent.Futures;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.netlet.util.Slice;
 
@@ -48,7 +49,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
   @Override
   public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = timeBucketAssigner.getTimeBucket(time);
     putInBucket(bucketId, timeBucket, key, value);
   }
 
@@ -61,7 +62,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
   @Override
   public Slice getSync(long bucketId, long time, @NotNull Slice key)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = timeBucketAssigner.getTimeBucket(time);
     if (timeBucket == -1) {
       //time is expired so no point in looking further.
       return BucketedState.EXPIRED;
@@ -78,7 +79,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
   @Override
   public Future<Slice> getAsync(long bucketId, long time, Slice key)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = timeBucketAssigner.getTimeBucket(time);
     if (timeBucket == -1) {
       //time is expired so no point in looking further.
       return Futures.immediateFuture(BucketedState.EXPIRED);
@@ -88,11 +89,17 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti
 
   @Min(1)
   @Override
-  public int getNumBuckets()
+  public long getNumBuckets()
   {
     return numBuckets;
   }
 
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+  }
+
   /**
    * Sets the number of buckets.
    *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
index bd3319f..19a6abc 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
@@ -182,7 +182,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
   {
     if (isKeyContainsMultiValue) {
       Slice keySlice = streamCodec.toByteArray(k);
-      int bucketId = getBucketId(k);
+      long bucketId = getBucketId(k);
       Slice valueSlice = store.getSync(bucketId, keySlice);
       List<V> listOb;
       if (valueSlice == null || valueSlice.length == 0) {
@@ -207,7 +207,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
   {
     if (isKeyContainsMultiValue) {
       Slice keySlice = streamCodec.toByteArray(k);
-      int bucketId = getBucketId(k);
+      long bucketId = getBucketId(k);
       Slice valueSlice = store.getSync(bucketId, keySlice);
       List<V> listOb;
       if (valueSlice == null || valueSlice.length == 0) {
@@ -232,7 +232,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
    */
   private boolean insertInStore(long bucketId, long timeBucket, Slice keySlice, Slice valueSlice)
   {
-    long timeBucketId = store.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(timeBucket);
+    long timeBucketId = store.getTimeBucketAssigner().getTimeBucket(timeBucket);
     if (timeBucketId != -1) {
       store.putInBucket(bucketId, timeBucketId, keySlice, valueSlice);
       return true;
@@ -270,7 +270,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM
     throw new UnsupportedOperationException();
   }
 
-  public int getBucketId(K k)
+  public long getBucketId(K k)
   {
     return k.hashCode() % store.getNumBuckets();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index e04286e..d558eee 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -40,6 +40,7 @@ import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.lib.fileaccess.FileAccess;
 import com.datatorrent.netlet.util.Slice;
 
@@ -59,7 +60,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
   }
 
   @Override
-  public int getNumBuckets()
+  public long getNumBuckets()
   {
     return timeBucketAssigner.getNumBuckets();
   }
@@ -67,14 +68,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
   @Override
   public void put(long time, @NotNull Slice key, @NotNull Slice value)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = timeBucketAssigner.getTimeBucket(time);
     putInBucket(timeBucket, timeBucket, key, value);
   }
 
   @Override
   public Slice getSync(long time, @NotNull Slice key)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = timeBucketAssigner.getTimeBucket(time);
     if (timeBucket == -1) {
       //time is expired so return expired slice.
       return BucketedState.EXPIRED;
@@ -85,7 +86,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
   @Override
   public Future<Slice> getAsync(long time, @NotNull Slice key)
   {
-    long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = timeBucketAssigner.getTimeBucket(time);
     if (timeBucket == -1) {
       //time is expired so return expired slice.
       return Futures.immediateFuture(BucketedState.EXPIRED);
@@ -101,10 +102,10 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
 
     //collect all the purged time buckets
     while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) {
-      int purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
-      if (buckets[purgedTimeBucketIdx] != null && buckets[purgedTimeBucketIdx].getBucketId() == purgedTimeBucket) {
-        bucketsForTeardown.add(buckets[purgedTimeBucketIdx]);
-        buckets[purgedTimeBucketIdx] = null;
+      long purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
+      if (buckets.containsKey(purgedTimeBucketIdx) && buckets.get(purgedTimeBucketIdx).getBucketId() == purgedTimeBucket) {
+        bucketsForTeardown.add(buckets.get(purgedTimeBucketIdx));
+        buckets.remove(purgedTimeBucketIdx);
       }
     }
 
@@ -121,14 +122,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
   }
 
   @Override
-  protected void handleBucketConflict(int bucketIdx, long newBucketId)
+  protected void handleBucketConflict(long bucketId, long newBucketId)
   {
-    Preconditions.checkArgument(buckets[bucketIdx].getBucketId() < newBucketId, "new time bucket should have a value"
+    Preconditions.checkArgument(buckets.get(bucketId).getBucketId() < newBucketId, "new time bucket should have a value"
         + " greater than the old time bucket");
     //Time buckets are purged periodically so here a bucket conflict is expected and so we just ignore conflicts.
-    bucketsForTeardown.add(buckets[bucketIdx]);
-    buckets[bucketIdx] = newBucket(newBucketId);
-    buckets[bucketIdx].setup(this);
+    bucketsForTeardown.add(buckets.get(bucketId));
+    buckets.put(bucketId, newBucket(newBucketId));
+    buckets.get(bucketId).setup(this);
   }
 
   @Override
@@ -138,6 +139,17 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
     super.purgeTimeBucketsLessThanEqualTo(timeBucket);
   }
 
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    // set UnBoundedTimeBucketAssigner to this managed state impl
+    if (timeBucketAssigner == null) {
+      UnboundedTimeBucketAssigner unboundedTimeBucketAssigner = new UnboundedTimeBucketAssigner();
+      setTimeBucketAssigner(unboundedTimeBucketAssigner);
+    }
+    super.setup(context);
+  }
+
   /**
    * This uses operator id instead of bucket id as the name of parent folder of time-buckets. This is because
    * multiple partitions may work on same time-buckets.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
new file mode 100644
index 0000000..ece7686
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * Keeps track of time buckets and triggers purging of obsolete time-buckets that moved out of boundary.<br/>
+ *
+ * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
+ * which time-bucket an event falls into and sliding the time boundaries.
+ * <p/>
+ *
+ * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
+ * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
+ * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
+ * <code>rererenceInstant = currentTime</code>, then <code>
+ *   numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
+ *
+ * These properties once configured shouldn't be changed because that will result in different time-buckets
+ * for the same (key,time) pair after a failure.
+ * <p/>
+ *
+ * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start
+ * is considered expired. The boundaries slide by {@link #getTimeBucket(long)}. The time which is passed as an
+ * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event
+ * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
+ *
+ */
+public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
+{
+  private long start;
+
+  private long end;
+
+  @NotNull
+  private Instant referenceInstant = new Instant();
+
+  @NotNull
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Duration expireBefore = Duration.standardDays(2);
+
+  private long bucketSpanMillis;
+
+  private int numBuckets;
+  private transient long fixedStart;
+  private transient boolean triggerPurge;
+  private transient long lowestPurgeableTimeBucket;
+
+
+  @Override
+  public void setup(@NotNull ManagedStateContext managedStateContext)
+  {
+    super.setup(managedStateContext);
+    fixedStart = referenceInstant.getMillis() - expireBefore.getMillis();
+
+    if (!isInitialized()) {
+
+      start = fixedStart;
+      bucketSpanMillis = getBucketSpan().getMillis();
+      numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis);
+      end = start + (numBuckets * bucketSpanMillis);
+
+      setInitialized(true);
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (triggerPurge && getPurgeListener() != null) {
+      triggerPurge = false;
+      getPurgeListener().purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  /**
+   * Get the bucket key for the long value and adjust boundaries if necessary.
+   *
+   * @param time value from which bucket key is derived.
+   * @return -1 if value is already expired; bucket key otherwise.
+   */
+  @Override
+  public long getTimeBucket(long time)
+  {
+    if (time < start) {
+      return -1;
+    }
+    long diffFromStart = time - fixedStart;
+    long key = diffFromStart / bucketSpanMillis;
+    if (time >= end) {
+      long diffInBuckets = (time - end) / bucketSpanMillis;
+      long move = (diffInBuckets + 1) * bucketSpanMillis;
+      start += move;
+      end += move;
+      triggerPurge = true;
+      lowestPurgeableTimeBucket += diffInBuckets;
+    }
+    return key;
+
+  }
+
+  /**
+   * @return number of buckets.
+   */
+  @Override
+  public long getNumBuckets()
+  {
+    return numBuckets;
+  }
+
+  /**
+   * @return reference instant
+   */
+  public Instant getReferenceInstant()
+  {
+    return referenceInstant;
+  }
+
+  /**
+   * Sets the reference instant (by default the system time when the streaming app is created).
+   * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}.
+   *
+   * @param referenceInstant
+   */
+  public void setReferenceInstant(Instant referenceInstant)
+  {
+    this.referenceInstant = referenceInstant;
+  }
+
+  /**
+   * @return duration before which the data is expired.
+   */
+  public Duration getExpireBefore()
+  {
+    return expireBefore;
+  }
+
+  /**
+   * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired.
+   * @param expireBefore duration
+   */
+  public void setExpireBefore(Duration expireBefore)
+  {
+    this.expireBefore = expireBefore;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 225439f..7cab41c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -87,7 +87,7 @@ class StateTracker extends TimerTask
       //freeing of state needs to be stopped during commit as commit results in transferring data to a state which
       // can be freed up as well.
       long bytesSum = 0;
-      for (Bucket bucket : managedStateImpl.buckets) {
+      for (Bucket bucket : managedStateImpl.buckets.values()) {
         if (bucket != null) {
           bytesSum += bucket.getSizeInBytes();
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
index d218b37..2cae914 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
@@ -21,7 +21,8 @@ package org.apache.apex.malhar.lib.state.managed;
 import javax.validation.constraints.NotNull;
 
 import org.joda.time.Duration;
-import org.joda.time.Instant;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
 
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
@@ -30,180 +31,79 @@ import com.google.common.base.Preconditions;
 import com.datatorrent.api.Context;
 
 /**
- * Keeps track of time buckets and triggers purging of obsolete time-buckets.<br/>
- *
- * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
- * which time-bucket an event falls into and sliding the time boundaries.
- * <p/>
- *
- * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
- * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
- * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
- * <code>rererenceInstant = currentTime</code>, then <code>
- *   numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
- *
- * These properties once configured shouldn't be changed because that will result in different time-buckets
- * for the same (key,time) pair after a failure.
- * <p/>
- *
- * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start
- * is considered expired. The boundaries slide by {@link #getTimeBucketAndAdjustBoundaries(long)}. The time which is passed as an
- * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event
- * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
- *
- * @since 3.4.0
+ * Abstract class to extract a bucket for a given time
  */
-public class TimeBucketAssigner implements ManagedStateComponent
+public abstract class TimeBucketAssigner implements ManagedStateComponent, WindowListener
 {
-  @NotNull
-  private Instant referenceInstant = new Instant();
+  private transient PurgeListener purgeListener;
 
-  @NotNull
-  @FieldSerializer.Bind(JavaSerializer.class)
-  private Duration expireBefore = Duration.standardDays(2);
+  private boolean initialized;
 
   @FieldSerializer.Bind(JavaSerializer.class)
   private Duration bucketSpan;
 
-  private long bucketSpanMillis;
-
-  private long start;
-  private long end;
-  private int numBuckets;
-  private transient long fixedStart;
-  private transient boolean triggerPurge;
-  private transient long lowestPurgeableTimeBucket;
-
-  private boolean initialized;
-
-  private transient PurgeListener purgeListener;
-
   @Override
   public void setup(@NotNull ManagedStateContext managedStateContext)
   {
     Context.OperatorContext context = managedStateContext.getOperatorContext();
-    fixedStart = referenceInstant.getMillis() - expireBefore.getMillis();
-
-    if (!initialized) {
-      if (bucketSpan == null) {
-        bucketSpan = Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
-            context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS));
-      }
-      start = fixedStart;
-      bucketSpanMillis = bucketSpan.getMillis();
-      numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis);
-      end = start + (numBuckets * bucketSpanMillis);
-
-      initialized = true;
-    }
-  }
-
-  public void endWindow()
-  {
-    if (triggerPurge && purgeListener != null) {
-      triggerPurge = false;
-      purgeListener.purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket);
-    }
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-
-  /**
-   * Get the bucket key for the long value and adjust boundaries if necessary.
-   *
-   * @param value value from which bucket key is derived.
-   * @return -1 if value is already expired; bucket key otherwise.
-   */
-  public long getTimeBucketAndAdjustBoundaries(long value)
-  {
-    if (value < start) {
-      return -1;
-    }
-    long diffFromStart = value - fixedStart;
-    long key = diffFromStart / bucketSpanMillis;
-    if (value >= end) {
-      long diffInBuckets = (value - end) / bucketSpanMillis;
-      long move = (diffInBuckets + 1) * bucketSpanMillis;
-      start += move;
-      end += move;
-      triggerPurge = true;
-      lowestPurgeableTimeBucket += diffInBuckets;
+    if (!initialized && bucketSpan == null) {
+      setBucketSpan(Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+          context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)));
     }
-    return key;
-
   }
 
   /**
-   * Sets the purge listener.
-   * @param purgeListener purge listener
+   * Get the time bucket for any given time
+   * @param time
+   * @return
    */
-  public void setPurgeListener(@NotNull PurgeListener purgeListener)
-  {
-    this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
-  }
+  public abstract long getTimeBucket(long time);
 
   /**
-   * @return number of buckets.
+   * Get possible number of buckets
+   * @return
    */
-  public int getNumBuckets()
-  {
-    return numBuckets;
-  }
+  public abstract long getNumBuckets();
 
   /**
-   * @return reference instant
+   * @return time-bucket span
    */
-  public Instant getReferenceInstant()
+  public Duration getBucketSpan()
   {
-    return referenceInstant;
+    return bucketSpan;
   }
 
   /**
-   * Sets the reference instant (by default the system time when the streaming app is created).
-   * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}.
-   *
-   * @param referenceInstant
+   * Sets the length of a time bucket.
+   * @param bucketSpan length of time bucket
    */
-  public void setReferenceInstant(Instant referenceInstant)
+  public void setBucketSpan(Duration bucketSpan)
   {
-    this.referenceInstant = referenceInstant;
+    this.bucketSpan = bucketSpan;
   }
 
-  /**
-   * @return duration before which the data is expired.
-   */
-  public Duration getExpireBefore()
+  public boolean isInitialized()
   {
-    return expireBefore;
+    return initialized;
   }
 
-  /**
-   * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired.
-   * @param expireBefore duration
-   */
-  public void setExpireBefore(Duration expireBefore)
+  public void setInitialized(boolean initialized)
   {
-    this.expireBefore = expireBefore;
+    this.initialized = initialized;
   }
 
   /**
-   * @return time-bucket span
+   * Sets the purge listener.
+   * @param purgeListener purge listener
    */
-  public Duration getBucketSpan()
+  public void setPurgeListener(@NotNull PurgeListener purgeListener)
   {
-    return bucketSpan;
+    this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
   }
 
-  /**
-   * Sets the length of a time bucket.
-   * @param bucketSpan length of time bucket
-   */
-  public void setBucketSpan(Duration bucketSpan)
+  public PurgeListener getPurgeListener()
   {
-    this.bucketSpan = bucketSpan;
+    return purgeListener;
   }
 
   /**
@@ -214,5 +114,4 @@ public class TimeBucketAssigner implements ManagedStateComponent
   {
     void purgeTimeBucketsLessThanEqualTo(long timeBucket);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
index 5d706db..5a2fa36 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java
@@ -26,4 +26,27 @@ package org.apache.apex.malhar.lib.state.managed;
 public interface TimeExtractor<T>
 {
   long getTime(T t);
+
+  class FixedTimeExtractor<V> implements TimeExtractor<V>
+  {
+
+    private long fixedTime;
+
+    public FixedTimeExtractor(long fixedTime)
+    {
+      this.fixedTime = fixedTime;
+    }
+
+    private FixedTimeExtractor()
+    {
+      // For kryo
+    }
+
+    @Override
+    public long getTime(V v)
+    {
+      return fixedTime;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java
new file mode 100644
index 0000000..2027249
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Simple Time bucket assigner to assiger time bucket for any given time <br>
+ * The algorithm is simple to just round the time to time bucket span. <br>
+ * Ex. given time bucket span is 1000 milliseconds <br>
+ * All times 1001, 1002 ... 1999 will be assigned to time bucket 1000 <br>
+ *
+ *
+ */
+public class UnboundedTimeBucketAssigner extends TimeBucketAssigner
+{
+  @Override
+  public long getTimeBucket(long time)
+  {
+
+    Preconditions.checkArgument(time >= 0, "Time: %s is illegal", time);
+    return time - time % getBucketSpan().getMillis();
+  }
+
+  @Override
+  public long getNumBuckets()
+  {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public void setup(@NotNull ManagedStateContext managedStateContext)
+  {
+    super.setup(managedStateContext);
+    setInitialized(true);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 24cc8b2..cc976f1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -57,29 +57,6 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     Spillable.SpillableComponent
 {
 
-  private static class FixedTimeExtractor<V> implements TimeExtractor<V>
-  {
-
-    private long fixedTime;
-
-    private FixedTimeExtractor(long fixedTime)
-    {
-      this.fixedTime = fixedTime;
-    }
-
-    private FixedTimeExtractor()
-    {
-      // For kryo
-    }
-
-    @Override
-    public long getTime(V v)
-    {
-      return fixedTime;
-    }
-
-  }
-
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
 
@@ -176,7 +153,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
 
       Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false);
       if (timeExtractor != null) {
-        spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new FixedTimeExtractor(keyTime));
+        spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new TimeExtractor.FixedTimeExtractor(keyTime));
       } else {
         spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
       }
@@ -288,7 +265,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
       if (timeExtractor == null) {
         spillableSet = new SpillableSetImpl<>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
       } else {
-        spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new FixedTimeExtractor(timeExtractor.getTime(key)));
+        spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new TimeExtractor.FixedTimeExtractor(timeExtractor.getTime(key)));
       }
       spillableSet.setup(context);
       cache.put(key, spillableSet);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index ef06eea..6a16e62 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
+import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
@@ -54,10 +55,13 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye
     if (keyToWindowsMap == null) {
       // NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on.
       // This is logged in APEXMALHAR-2271
-      keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde);
+      // A work around to make session window data never expire and all kept in one time bucket
+      keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde, new TimeExtractor.FixedTimeExtractor(Long.MAX_VALUE));
     }
   }
 
+
+
   @Override
   @SuppressWarnings("unchecked")
   public void remove(Window window)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 82428fb..1d2334d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -32,7 +32,6 @@ import org.junit.runner.Description;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
-import com.datatorrent.lib.util.KryoCloneUtils;
 import com.datatorrent.lib.util.TestUtils;
 import com.datatorrent.netlet.util.Slice;
 
@@ -66,13 +65,6 @@ public class ManagedTimeUnifiedStateImplTest
   public TestMeta testMeta = new TestMeta();
 
   @Test
-  public void testSerde() throws IOException
-  {
-    ManagedTimeUnifiedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
-    Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
-  }
-
-  @Test
   public void testSimplePutGet()
   {
     Slice one = ManagedStateTestUtils.getSliceFor("1");
@@ -129,7 +121,7 @@ public class ManagedTimeUnifiedStateImplTest
 
     testMeta.managedState.setup(testMeta.operatorContext);
 
-    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucket(time);
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
 
     //write data to disk explicitly
@@ -151,7 +143,7 @@ public class ManagedTimeUnifiedStateImplTest
 
     testMeta.managedState.setup(testMeta.operatorContext);
 
-    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time);
+    long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucket(time);
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
 
     //write data to disk explicitly

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
index 8ae4db7..ef67b94 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
@@ -31,7 +31,7 @@ class MockManagedStateContext implements ManagedStateContext
   private TFileImpl.DTFileImpl fileAccess = new TFileImpl.DTFileImpl();
   private Comparator<Slice> keyComparator = new SliceComparator();
   private BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
-  private TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+  private MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
 
   private final Context.OperatorContext operatorContext;
 
@@ -58,7 +58,7 @@ class MockManagedStateContext implements ManagedStateContext
   }
 
   @Override
-  public TimeBucketAssigner getTimeBucketAssigner()
+  public MovingBoundaryTimeBucketAssigner getTimeBucketAssigner()
   {
     return timeBucketAssigner;
   }
@@ -84,7 +84,7 @@ class MockManagedStateContext implements ManagedStateContext
     this.bucketsFileSystem = bucketsFileSystem;
   }
 
-  void setTimeBucketAssigner(TimeBucketAssigner timeBucketAssigner)
+  void setTimeBucketAssigner(MovingBoundaryTimeBucketAssigner timeBucketAssigner)
   {
     this.timeBucketAssigner = timeBucketAssigner;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
new file mode 100644
index 0000000..e4e5d2e
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class MovingBoundaryTimeBucketAssignerTest
+{
+
+  class TestMeta extends TestWatcher
+  {
+    MovingBoundaryTimeBucketAssigner timeBucketAssigner;
+    MockManagedStateContext mockManagedStateContext;
+
+    @Override
+    protected void starting(Description description)
+    {
+      timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
+      mockManagedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    MovingBoundaryTimeBucketAssigner deserialized = KryoCloneUtils.cloneObject(testMeta.timeBucketAssigner);
+    Assert.assertNotNull("time bucket assigner", deserialized);
+  }
+
+  @Test
+  public void testNumBuckets()
+  {
+    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
+    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
+
+    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+    Assert.assertEquals("num buckets", 2, testMeta.timeBucketAssigner.getNumBuckets());
+    testMeta.timeBucketAssigner.teardown();
+  }
+
+  @Test
+  public void testTimeBucketKey()
+  {
+    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
+    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
+
+    long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
+    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+    long time1 = referenceTime - Duration.standardMinutes(2).getMillis();
+    Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time1));
+
+    long time0 = referenceTime - Duration.standardMinutes(40).getMillis();
+    Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucket(time0));
+
+    long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis();
+    Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(expiredTime));
+    testMeta.timeBucketAssigner.teardown();
+  }
+
+  @Test
+  public void testTimeBucketKeyExpiry()
+  {
+    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
+    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
+
+    long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
+    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+    long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
+    Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) );
+
+    long time2 = Duration.standardSeconds(10).getMillis()  + referenceTime;
+    Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) );
+
+    //Check for expiry of time1 now
+    Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) );
+
+    testMeta.timeBucketAssigner.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
deleted file mode 100644
index 8ca0960..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.apex.malhar.lib.state.managed;
-
-import java.io.IOException;
-
-import org.joda.time.Duration;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class TimeBucketAssignerTest
-{
-
-  class TestMeta extends TestWatcher
-  {
-    TimeBucketAssigner timeBucketAssigner;
-    MockManagedStateContext mockManagedStateContext;
-
-    @Override
-    protected void starting(Description description)
-    {
-      timeBucketAssigner = new TimeBucketAssigner();
-      mockManagedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-    }
-  }
-
-  @Rule
-  public TestMeta testMeta = new TestMeta();
-
-  @Test
-  public void testSerde() throws IOException
-  {
-    TimeBucketAssigner deserialized = KryoCloneUtils.cloneObject(testMeta.timeBucketAssigner);
-    Assert.assertNotNull("time bucket assigner", deserialized);
-  }
-
-  @Test
-  public void testNumBuckets()
-  {
-    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
-    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
-
-    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
-
-    Assert.assertEquals("num buckets", 2, testMeta.timeBucketAssigner.getNumBuckets());
-    testMeta.timeBucketAssigner.teardown();
-  }
-
-  @Test
-  public void testTimeBucketKey()
-  {
-    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1));
-    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30));
-
-    long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
-    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
-
-    long time1 = referenceTime - Duration.standardMinutes(2).getMillis();
-    Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1));
-
-    long time0 = referenceTime - Duration.standardMinutes(40).getMillis();
-    Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time0));
-
-    long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis();
-    Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime));
-    testMeta.timeBucketAssigner.teardown();
-  }
-
-  @Test
-  public void testTimeBucketKeyExpiry()
-  {
-    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
-    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
-
-    long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
-    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
-
-    long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
-    Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) );
-
-    long time2 = Duration.standardSeconds(10).getMillis()  + referenceTime;
-    Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time2) );
-
-    //Check for expiry of time1 now
-    Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) );
-
-    testMeta.timeBucketAssigner.teardown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index f898e2d..512626e 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -58,7 +58,8 @@ import com.datatorrent.lib.util.KeyValPair;
 public class WindowedOperatorTest
 {
 
-  public static final long BASE = (System.currentTimeMillis() / 1000) * 1000;
+  // To test the extreme condition counting from
+  public static final long BASE = ((System.currentTimeMillis() - 3600000L * 24 * 365) / 1000) * 1000;
 
   @Parameterized.Parameters
   public static Collection<Object[]> testParameters()