You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/11/15 02:09:55 UTC

apex-malhar git commit: APEXMALHAR-2335 APEXMALHAR-2333 APEXMALHAR-2334 #resolve #comment Problems on StateTracker

Repository: apex-malhar
Updated Branches:
  refs/heads/master a75b093df -> a25b6140c


APEXMALHAR-2335 APEXMALHAR-2333 APEXMALHAR-2334 #resolve #comment Problems on StateTracker


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a25b6140
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a25b6140
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a25b6140

Branch: refs/heads/master
Commit: a25b6140cac9653b10657061a2b59d7404ecf56a
Parents: a75b093
Author: brightchen <br...@datatorrent.com>
Authored: Thu Nov 10 14:59:52 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Mon Nov 14 16:22:40 2016 -0800

----------------------------------------------------------------------
 .../state/ManagedStateBenchmarkApp.java         |   2 +
 .../benchmark/state/StoreOperator.java          |  19 ++-
 .../state/ManagedStateBenchmarkAppTest.java     |   2 +-
 .../state/managed/AbstractManagedStateImpl.java |   4 +-
 .../apex/malhar/lib/state/managed/Bucket.java   |  53 +++++-
 .../malhar/lib/state/managed/StateTracker.java  | 160 +++++++------------
 .../serde/DefaultBlockReleaseStrategy.java      |  12 +-
 .../lib/utils/serde/WindowedBlockStream.java    |  24 +++
 .../lib/state/managed/DefaultBucketTest.java    |  10 +-
 .../lib/state/managed/StateTrackerTest.java     |  15 +-
 10 files changed, 159 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index eab02db..be615d0 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Random;
 
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +79,7 @@ public class ManagedStateBenchmarkApp implements StreamingApplication
     String basePath = getStoreBasePath(conf);
     ManagedTimeUnifiedStateImpl store = new ManagedTimeUnifiedStateImpl();
     ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+    store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000));
     return store;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index ad92b60..f960d15 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -56,11 +56,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
   //this is the store we are going to use
   private ManagedTimeUnifiedStateImpl store;
 
-  private long lastCheckPointWindowId = -1;
-  private long currentWindowId;
   private long tupleCount = 0;
   private int windowCountPerStatistics = 0;
   private long statisticsBeginTime = 0;
+  private long applicationBeginTime = 0;
+  private long totalTupleCount = 0;
+
 
   private ExecMode execMode = ExecMode.INSERT;
   private int timeRange = 1000 * 60;
@@ -89,11 +90,13 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
   @Override
   public void beginWindow(long windowId)
   {
-    currentWindowId = windowId;
     store.beginWindow(windowId);
     if (statisticsBeginTime <= 0) {
       statisticsBeginTime = System.currentTimeMillis();
     }
+    if (applicationBeginTime <= 0) {
+      applicationBeginTime = System.currentTimeMillis();
+    }
   }
 
   @Override
@@ -226,7 +229,7 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
   public void beforeCheckpoint(long windowId)
   {
     store.beforeCheckpoint(windowId);
-    logger.info("beforeCheckpoint {}", windowId);
+    logger.debug("beforeCheckpoint {}", windowId);
   }
 
   public ManagedTimeUnifiedStateImpl getStore()
@@ -241,8 +244,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
 
   private void logStatistics()
   {
-    long spentTime = System.currentTimeMillis() - statisticsBeginTime;
-    logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime);
+    final long now = System.currentTimeMillis();
+    long spentTime = now - statisticsBeginTime;
+    long totalSpentTime = now - applicationBeginTime;
+    totalTupleCount += tupleCount;
+    logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
+        totalTupleCount * 1000 / totalSpentTime);
 
     statisticsBeginTime = System.currentTimeMillis();
     tupleCount = 0;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
index 5279d36..4f03a10 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -86,7 +86,7 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
 
     // Create local cluster
     final LocalMode.Controller lc = lma.getController();
-    lc.run(300000);
+    lc.run(3000000);
 
     lc.shutdown();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 1c52c31..364bc19 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -46,7 +46,6 @@ import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.Futures;
 
 import com.datatorrent.api.Component;
-import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.Stateless;
@@ -158,8 +157,7 @@ public abstract class AbstractManagedStateImpl
 
   @NotNull
   @FieldSerializer.Bind(JavaSerializer.class)
-  private Duration checkStateSizeInterval = Duration.millis(
-      DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue);
+  private Duration checkStateSizeInterval = Duration.millis(60000);
 
   @FieldSerializer.Bind(JavaSerializer.class)
   private Duration durationPreventingFreeingSpace;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index cbc4e03..3c18b2f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -435,17 +435,51 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
      * Free memory up to the given windowId
      * This method will be called by another thread. Adding concurrency control to Stream would impact the performance.
      * This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread
+     *
+     * We intend to manage memory by keyStream and valueStream. But the we can't avoid caller use other mechanism to manage memory.
+     * It is required to cleanup maps in this case.
      */
     @Override
     public long freeMemory(long windowId) throws IOException
     {
-      // calculate the size first and then send the release memory request. It could reduce the chance of conflict and increase the performance.
-      long size = keyStream.dataSizeUpToWindow(windowId) + valueStream.dataSizeUpToWindow(windowId);
+      long memoryFreed = 0;
+      Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> entryIter = committedData.entrySet().iterator();
+      while (entryIter.hasNext()) {
+        Map.Entry<Long, Map<Slice, BucketedValue>> bucketEntry = entryIter.next();
+        if (bucketEntry.getKey() > windowId) {
+          break;
+        }
+
+        Map<Slice, BucketedValue> windowData = bucketEntry.getValue();
+        entryIter.remove();
+
+        for (Map.Entry<Slice, BucketedValue> entry : windowData.entrySet()) {
+          memoryFreed += entry.getKey().length + entry.getValue().getSize();
+        }
+      }
+
+      fileCache.clear();
+      if (cachedBucketMetas != null) {
+
+        for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
+          FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
+          if (reader != null) {
+            memoryFreed += tbm.getSizeInBytes();
+            reader.close();
+          }
+        }
+      }
+
+      sizeInBytes.getAndAdd(-memoryFreed);
+
+      //add the windowId to the queue to let operator thread release memory from keyStream and valueStream
       windowsForFreeMemory.add(windowId);
-      return size;
+
+      return memoryFreed;
     }
 
     /**
+     * Release the memory managed by keyStream and valueStream.
      * This operation must be called from operator thread. It won't do anything if no memory to be freed
      */
     protected long releaseMemory()
@@ -459,10 +493,10 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
         memoryFreed += originSize - (keyStream.size() + valueStream.size());
       }
 
-      if (memoryFreed > 0) {
-        LOG.debug("Total freed memory size: {}", memoryFreed);
-        sizeInBytes.getAndAdd(-memoryFreed);
-      }
+      //release the free memory immediately
+      keyStream.releaseAllFreeMemory();
+      valueStream.releaseAllFreeMemory();
+
       return memoryFreed;
     }
 
@@ -482,6 +516,7 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
     @Override
     public void committed(long committedWindowId)
     {
+      releaseMemory();
       Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> stateIterator = checkpointedData.entrySet().iterator();
 
       while (stateIterator.hasNext()) {
@@ -518,7 +553,9 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
             }
           }
           sizeInBytes.getAndAdd(-memoryFreed);
-          committedData.put(savedWindow, bucketData);
+          if (!bucketData.isEmpty()) {
+            committedData.put(savedWindow, bucketData);
+          }
           stateIterator.remove();
         } else {
           break;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 5678107..225439f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -19,11 +19,12 @@
 package org.apache.apex.malhar.lib.state.managed;
 
 import java.io.IOException;
-import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 
 import javax.validation.constraints.NotNull;
 
@@ -31,57 +32,51 @@ import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang3.mutable.MutableLong;
+
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
 /**
  * Tracks the size of state in memory and evicts buckets.
  */
 class StateTracker extends TimerTask
 {
-  //bucket id -> bucket id & time wrapper
-  private final transient ConcurrentHashMap<Long, BucketIdTimeWrapper> bucketAccessTimes = new ConcurrentHashMap<>();
-
-  private transient ConcurrentSkipListSet<BucketIdTimeWrapper> bucketHeap;
-
   private final transient Timer memoryFreeService = new Timer();
 
   protected transient AbstractManagedStateImpl managedStateImpl;
 
+  private transient long lastUpdateAccessTime = 0;
+  private final transient Set<Long> accessedBucketIds = Sets.newHashSet();
+  private final transient LinkedHashMap<Long, MutableLong> bucketLastAccess = new LinkedHashMap<>(16, 0.75f, true);
+
+  private int updateAccessTimeInterval = 500;
+
   void setup(@NotNull AbstractManagedStateImpl managedStateImpl)
   {
     this.managedStateImpl = Preconditions.checkNotNull(managedStateImpl, "managed state impl");
 
-    this.bucketHeap = new ConcurrentSkipListSet<>(
-        new Comparator<BucketIdTimeWrapper>()
-        {
-          //Note: this comparator imposes orderings that are inconsistent with equals.
-          @Override
-          public int compare(BucketIdTimeWrapper o1, BucketIdTimeWrapper o2)
-          {
-            if (o1.getLastAccessedTime() < o2.getLastAccessedTime()) {
-              return -1;
-            }
-            if (o1.getLastAccessedTime() > o2.getLastAccessedTime()) {
-              return 1;
-            }
-
-            return Long.compare(o1.bucketId, o2.bucketId);
-          }
-        });
     long intervalMillis = managedStateImpl.getCheckStateSizeInterval().getMillis();
     memoryFreeService.scheduleAtFixedRate(this, intervalMillis, intervalMillis);
   }
 
   void bucketAccessed(long bucketId)
   {
-    BucketIdTimeWrapper idTimeWrapper = bucketAccessTimes.get(bucketId);
-    if (idTimeWrapper != null) {
-      bucketHeap.remove(idTimeWrapper);
-    }  else {
-      idTimeWrapper = new BucketIdTimeWrapper(bucketId);
+    long now = System.currentTimeMillis();
+    if (accessedBucketIds.add(bucketId) || now - lastUpdateAccessTime > updateAccessTimeInterval) {
+      synchronized (bucketLastAccess) {
+        for (long id : accessedBucketIds) {
+          MutableLong lastAccessTime = bucketLastAccess.get(id);
+          if (lastAccessTime != null) {
+            lastAccessTime.setValue(now);
+          } else {
+            bucketLastAccess.put(id, new MutableLong(now));
+          }
+        }
+      }
+      accessedBucketIds.clear();
+      lastUpdateAccessTime = now;
     }
-    idTimeWrapper.setLastAccessedTime(System.currentTimeMillis());
-    bucketHeap.add(idTimeWrapper);
   }
 
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@@ -105,88 +100,51 @@ class StateTracker extends TimerTask
           durationMillis = duration.getMillis();
         }
 
-        BucketIdTimeWrapper idTimeWrapper;
-        while (bytesSum > managedStateImpl.getMaxMemorySize() && bucketHeap.size() > 0 &&
-            null != (idTimeWrapper = bucketHeap.first())) {
-          //trigger buckets to free space
-
-          if (System.currentTimeMillis() - idTimeWrapper.getLastAccessedTime() < durationMillis) {
-            //if the least recently used bucket cannot free up space because it was accessed within the
-            //specified duration then subsequent buckets cannot free space as well because this heap is ordered by time.
-            break;
-          }
-          long bucketId = idTimeWrapper.bucketId;
-          Bucket bucket = managedStateImpl.getBucket(bucketId);
-          if (bucket != null) {
-
-            synchronized (bucket) {
-              long sizeFreed;
-              try {
-                sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
-                LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
-              } catch (IOException e) {
-                managedStateImpl.throwable.set(e);
-                throw new RuntimeException("freeing " + bucketId, e);
+        synchronized (bucketLastAccess) {
+          long now = System.currentTimeMillis();
+          for (Iterator<Map.Entry<Long, MutableLong>> iterator = bucketLastAccess.entrySet().iterator();
+              bytesSum > managedStateImpl.getMaxMemorySize() && iterator.hasNext(); ) {
+            Map.Entry<Long, MutableLong> entry = iterator.next();
+            if (now - entry.getValue().longValue() < durationMillis) {
+              break;
+            }
+            long bucketId = entry.getKey();
+            Bucket bucket = managedStateImpl.getBucket(bucketId);
+            if (bucket != null) {
+              synchronized (bucket) {
+                long sizeFreed;
+                try {
+                  sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
+                  LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
+                } catch (IOException e) {
+                  managedStateImpl.throwable.set(e);
+                  throw new RuntimeException("freeing " + bucketId, e);
+                }
+                bytesSum -= sizeFreed;
+              }
+              if (bucket.getSizeInBytes() == 0) {
+                iterator.remove();
               }
-              bytesSum -= sizeFreed;
             }
-            bucketHeap.remove(idTimeWrapper);
-            bucketAccessTimes.remove(bucketId);
           }
         }
       }
     }
   }
 
-  void teardown()
+  public int getUpdateAccessTimeInterval()
   {
-    memoryFreeService.cancel();
+    return updateAccessTimeInterval;
   }
 
-  /**
-   * Wrapper class for bucket id and the last time the bucket was accessed.
-   */
-  private static class BucketIdTimeWrapper
+  public void setUpdateAccessTimeInterval(int updateAccessTimeInterval)
   {
-    private final long bucketId;
-    private long lastAccessedTime;
-
-    BucketIdTimeWrapper(long bucketId)
-    {
-      this.bucketId = bucketId;
-    }
-
-    private synchronized long getLastAccessedTime()
-    {
-      return lastAccessedTime;
-    }
-
-    private synchronized void setLastAccessedTime(long lastAccessedTime)
-    {
-      this.lastAccessedTime = lastAccessedTime;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-      if (this == o) {
-        return true;
-      }
-      if (!(o instanceof BucketIdTimeWrapper)) {
-        return false;
-      }
-
-      BucketIdTimeWrapper that = (BucketIdTimeWrapper)o;
-      //Note: the comparator used with bucket heap imposes orderings that are inconsistent with equals
-      return bucketId == that.bucketId;
-
-    }
+    this.updateAccessTimeInterval = updateAccessTimeInterval;
+  }
 
-    @Override
-    public int hashCode()
-    {
-      return (int)(bucketId ^ (bucketId >>> 32));
-    }
+  void teardown()
+  {
+    memoryFreeService.cancel();
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(StateTracker.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
index 93929e4..365cbc3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
@@ -18,9 +18,8 @@
  */
 package org.apache.apex.malhar.lib.utils.serde;
 
-import java.util.Arrays;
-
 import org.apache.commons.collections.buffer.CircularFifoBuffer;
+import org.apache.commons.lang3.mutable.MutableInt;
 
 /**
  * This implementation get the minimum number of free blocks in the period to release.
@@ -30,7 +29,6 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
 {
   public static final int DEFAULT_PERIOD = 60; // 60 reports
   private CircularFifoBuffer freeBlockNumQueue;
-  private Integer[] tmpArray;
 
   public DefaultBlockReleaseStrategy()
   {
@@ -40,8 +38,6 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
   public DefaultBlockReleaseStrategy(int period)
   {
     freeBlockNumQueue = new CircularFifoBuffer(period);
-    tmpArray = new Integer[period];
-    Arrays.fill(tmpArray, 0);
   }
 
   /**
@@ -54,7 +50,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
     if (freeBlockNum < 0) {
       throw new IllegalArgumentException("The number of free blocks could not less than zero.");
     }
-    freeBlockNumQueue.add(freeBlockNum);
+    freeBlockNumQueue.add(new MutableInt(freeBlockNum));
   }
 
   /**
@@ -66,7 +62,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
   {
     int minNum = Integer.MAX_VALUE;
     for (Object num : freeBlockNumQueue) {
-      minNum = Math.min((Integer)num, minNum);
+      minNum = Math.min(((MutableInt)num).intValue(), minNum);
     }
     return minNum;
   }
@@ -89,7 +85,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
      * decrease by released blocks
      */
     for (Object num : freeBlockNumQueue) {
-      freeBlockNumQueue.add(Math.max((Integer)num - numReleasedBlocks, 0));
+      ((MutableInt)num).setValue(Math.max(((MutableInt)num).intValue() - numReleasedBlocks, 0));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
index fa4cd73..53710f8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
@@ -246,4 +246,28 @@ public class WindowedBlockStream extends BlockStream implements WindowListener,
     }
   }
 
+  /**
+   * This method releases all free memory immediately.
+   * This method will not be controlled by release strategy
+   */
+  public void releaseAllFreeMemory()
+  {
+    int releasedBlocks = 0;
+
+    Iterator<Integer> iter = freeBlockIds.iterator();
+    while (iter.hasNext()) {
+      //release blocks
+      int blockId = iter.next();
+      iter.remove();
+      blocks.remove(blockId);
+      releasedBlocks++;
+    }
+
+    /**
+     * report number of released blocks
+     */
+    if (releasedBlocks > 0) {
+      releaseStrategy.releasedBlocks(releasedBlocks);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 6645a98..f7e24de 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -32,7 +32,6 @@ import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource;
 import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
 import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
 import org.apache.apex.malhar.lib.utils.serde.StringSerde;
-import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
 
 import com.google.common.primitives.Longs;
 
@@ -223,13 +222,8 @@ public class DefaultBucketTest
     testMeta.defaultBucket.get(keySlice, -1, ReadSource.MEMORY);
     long sizeFreed = currentSize - testMeta.defaultBucket.getSizeInBytes();
 
-    SerializationBuffer tmpBuffer = new SerializationBuffer(new WindowedBlockStream());
-    tmpBuffer.writeBytes(keyPrefix);
-    tmpBuffer.writeString(key);
-    tmpBuffer.writeString(value);
-    int expectedFreedSize = tmpBuffer.toSlice().toByteArray().length; //key prefix, key length, key; value length, value
-    Assert.assertEquals("size freed", expectedFreedSize, sizeFreed);
-    Assert.assertEquals("existing size", currentSize - expectedFreedSize, testMeta.defaultBucket.getSizeInBytes());
+    Assert.assertEquals("size freed", initSize, sizeFreed);
+    Assert.assertEquals("existing size", currentSize - initSize, testMeta.defaultBucket.getSizeInBytes());
 
     testMeta.defaultBucket.teardown();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
index 07e141a..ca78186 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
@@ -20,7 +20,7 @@
 package org.apache.apex.malhar.lib.state.managed;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
 import org.joda.time.Duration;
@@ -30,7 +30,7 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
@@ -82,7 +82,7 @@ public class StateTrackerTest
 
     testMeta.managedState.latch.await();
     testMeta.managedState.teardown();
-    Assert.assertEquals("freed bucket", Lists.newArrayList(1L), testMeta.managedState.freedBuckets);
+    Assert.assertEquals("freed bucket", Sets.newHashSet(1L), testMeta.managedState.freedBuckets);
   }
 
   @Test
@@ -101,7 +101,7 @@ public class StateTrackerTest
 
     testMeta.managedState.latch.await();
     testMeta.managedState.teardown();
-    Assert.assertEquals("freed bucket", Lists.newArrayList(1L, 2L), testMeta.managedState.freedBuckets);
+    Assert.assertEquals("freed bucket", Sets.newHashSet(1L, 2L), testMeta.managedState.freedBuckets);
   }
 
   @Test
@@ -128,7 +128,7 @@ public class StateTrackerTest
   private static class MockManagedStateImpl extends ManagedStateImpl
   {
     CountDownLatch latch;
-    List<Long> freedBuckets = Lists.newArrayList();
+    Set<Long> freedBuckets = Sets.newHashSet();
 
     @Override
     protected Bucket newBucket(long bucketId)
@@ -149,8 +149,9 @@ public class StateTrackerTest
     public long freeMemory(long windowId) throws IOException
     {
       long freedBytes = super.freeMemory(windowId);
-      ((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId());
-      ((MockManagedStateImpl)managedStateContext).latch.countDown();
+      if (((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId())) {
+        ((MockManagedStateImpl)managedStateContext).latch.countDown();
+      }
       return freedBytes;
     }