You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/11/15 02:09:55 UTC
apex-malhar git commit: APEXMALHAR-2335 APEXMALHAR-2333
APEXMALHAR-2334 #resolve #comment Problems on StateTracker
Repository: apex-malhar
Updated Branches:
refs/heads/master a75b093df -> a25b6140c
APEXMALHAR-2335 APEXMALHAR-2333 APEXMALHAR-2334 #resolve #comment Problems on StateTracker
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a25b6140
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a25b6140
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a25b6140
Branch: refs/heads/master
Commit: a25b6140cac9653b10657061a2b59d7404ecf56a
Parents: a75b093
Author: brightchen <br...@datatorrent.com>
Authored: Thu Nov 10 14:59:52 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Mon Nov 14 16:22:40 2016 -0800
----------------------------------------------------------------------
.../state/ManagedStateBenchmarkApp.java | 2 +
.../benchmark/state/StoreOperator.java | 19 ++-
.../state/ManagedStateBenchmarkAppTest.java | 2 +-
.../state/managed/AbstractManagedStateImpl.java | 4 +-
.../apex/malhar/lib/state/managed/Bucket.java | 53 +++++-
.../malhar/lib/state/managed/StateTracker.java | 160 +++++++------------
.../serde/DefaultBlockReleaseStrategy.java | 12 +-
.../lib/utils/serde/WindowedBlockStream.java | 24 +++
.../lib/state/managed/DefaultBucketTest.java | 10 +-
.../lib/state/managed/StateTrackerTest.java | 15 +-
10 files changed, 159 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index eab02db..be615d0 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Random;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,6 +79,7 @@ public class ManagedStateBenchmarkApp implements StreamingApplication
String basePath = getStoreBasePath(conf);
ManagedTimeUnifiedStateImpl store = new ManagedTimeUnifiedStateImpl();
((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+ store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000));
return store;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index ad92b60..f960d15 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -56,11 +56,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
//this is the store we are going to use
private ManagedTimeUnifiedStateImpl store;
- private long lastCheckPointWindowId = -1;
- private long currentWindowId;
private long tupleCount = 0;
private int windowCountPerStatistics = 0;
private long statisticsBeginTime = 0;
+ private long applicationBeginTime = 0;
+ private long totalTupleCount = 0;
+
private ExecMode execMode = ExecMode.INSERT;
private int timeRange = 1000 * 60;
@@ -89,11 +90,13 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
@Override
public void beginWindow(long windowId)
{
- currentWindowId = windowId;
store.beginWindow(windowId);
if (statisticsBeginTime <= 0) {
statisticsBeginTime = System.currentTimeMillis();
}
+ if (applicationBeginTime <= 0) {
+ applicationBeginTime = System.currentTimeMillis();
+ }
}
@Override
@@ -226,7 +229,7 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
public void beforeCheckpoint(long windowId)
{
store.beforeCheckpoint(windowId);
- logger.info("beforeCheckpoint {}", windowId);
+ logger.debug("beforeCheckpoint {}", windowId);
}
public ManagedTimeUnifiedStateImpl getStore()
@@ -241,8 +244,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
private void logStatistics()
{
- long spentTime = System.currentTimeMillis() - statisticsBeginTime;
- logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime);
+ final long now = System.currentTimeMillis();
+ long spentTime = now - statisticsBeginTime;
+ long totalSpentTime = now - applicationBeginTime;
+ totalTupleCount += tupleCount;
+ logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
+ totalTupleCount * 1000 / totalSpentTime);
statisticsBeginTime = System.currentTimeMillis();
tupleCount = 0;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
index 5279d36..4f03a10 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -86,7 +86,7 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
// Create local cluster
final LocalMode.Controller lc = lma.getController();
- lc.run(300000);
+ lc.run(3000000);
lc.shutdown();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 1c52c31..364bc19 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -46,7 +46,6 @@ import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.Futures;
import com.datatorrent.api.Component;
-import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.Stateless;
@@ -158,8 +157,7 @@ public abstract class AbstractManagedStateImpl
@NotNull
@FieldSerializer.Bind(JavaSerializer.class)
- private Duration checkStateSizeInterval = Duration.millis(
- DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue);
+ private Duration checkStateSizeInterval = Duration.millis(60000);
@FieldSerializer.Bind(JavaSerializer.class)
private Duration durationPreventingFreeingSpace;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index cbc4e03..3c18b2f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -435,17 +435,51 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
* Free memory up to the given windowId
* This method will be called by another thread. Adding concurrency control to Stream would impact the performance.
* This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread
+ *
+ * We intend to manage memory by keyStream and valueStream. But the we can't avoid caller use other mechanism to manage memory.
+ * It is required to cleanup maps in this case.
*/
@Override
public long freeMemory(long windowId) throws IOException
{
- // calculate the size first and then send the release memory request. It could reduce the chance of conflict and increase the performance.
- long size = keyStream.dataSizeUpToWindow(windowId) + valueStream.dataSizeUpToWindow(windowId);
+ long memoryFreed = 0;
+ Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> entryIter = committedData.entrySet().iterator();
+ while (entryIter.hasNext()) {
+ Map.Entry<Long, Map<Slice, BucketedValue>> bucketEntry = entryIter.next();
+ if (bucketEntry.getKey() > windowId) {
+ break;
+ }
+
+ Map<Slice, BucketedValue> windowData = bucketEntry.getValue();
+ entryIter.remove();
+
+ for (Map.Entry<Slice, BucketedValue> entry : windowData.entrySet()) {
+ memoryFreed += entry.getKey().length + entry.getValue().getSize();
+ }
+ }
+
+ fileCache.clear();
+ if (cachedBucketMetas != null) {
+
+ for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
+ FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
+ if (reader != null) {
+ memoryFreed += tbm.getSizeInBytes();
+ reader.close();
+ }
+ }
+ }
+
+ sizeInBytes.getAndAdd(-memoryFreed);
+
+ //add the windowId to the queue to let operator thread release memory from keyStream and valueStream
windowsForFreeMemory.add(windowId);
- return size;
+
+ return memoryFreed;
}
/**
+ * Release the memory managed by keyStream and valueStream.
* This operation must be called from operator thread. It won't do anything if no memory to be freed
*/
protected long releaseMemory()
@@ -459,10 +493,10 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
memoryFreed += originSize - (keyStream.size() + valueStream.size());
}
- if (memoryFreed > 0) {
- LOG.debug("Total freed memory size: {}", memoryFreed);
- sizeInBytes.getAndAdd(-memoryFreed);
- }
+ //release the free memory immediately
+ keyStream.releaseAllFreeMemory();
+ valueStream.releaseAllFreeMemory();
+
return memoryFreed;
}
@@ -482,6 +516,7 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
@Override
public void committed(long committedWindowId)
{
+ releaseMemory();
Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> stateIterator = checkpointedData.entrySet().iterator();
while (stateIterator.hasNext()) {
@@ -518,7 +553,9 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
}
}
sizeInBytes.getAndAdd(-memoryFreed);
- committedData.put(savedWindow, bucketData);
+ if (!bucketData.isEmpty()) {
+ committedData.put(savedWindow, bucketData);
+ }
stateIterator.remove();
} else {
break;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 5678107..225439f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -19,11 +19,12 @@
package org.apache.apex.malhar.lib.state.managed;
import java.io.IOException;
-import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import javax.validation.constraints.NotNull;
@@ -31,57 +32,51 @@ import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.mutable.MutableLong;
+
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
/**
* Tracks the size of state in memory and evicts buckets.
*/
class StateTracker extends TimerTask
{
- //bucket id -> bucket id & time wrapper
- private final transient ConcurrentHashMap<Long, BucketIdTimeWrapper> bucketAccessTimes = new ConcurrentHashMap<>();
-
- private transient ConcurrentSkipListSet<BucketIdTimeWrapper> bucketHeap;
-
private final transient Timer memoryFreeService = new Timer();
protected transient AbstractManagedStateImpl managedStateImpl;
+ private transient long lastUpdateAccessTime = 0;
+ private final transient Set<Long> accessedBucketIds = Sets.newHashSet();
+ private final transient LinkedHashMap<Long, MutableLong> bucketLastAccess = new LinkedHashMap<>(16, 0.75f, true);
+
+ private int updateAccessTimeInterval = 500;
+
void setup(@NotNull AbstractManagedStateImpl managedStateImpl)
{
this.managedStateImpl = Preconditions.checkNotNull(managedStateImpl, "managed state impl");
- this.bucketHeap = new ConcurrentSkipListSet<>(
- new Comparator<BucketIdTimeWrapper>()
- {
- //Note: this comparator imposes orderings that are inconsistent with equals.
- @Override
- public int compare(BucketIdTimeWrapper o1, BucketIdTimeWrapper o2)
- {
- if (o1.getLastAccessedTime() < o2.getLastAccessedTime()) {
- return -1;
- }
- if (o1.getLastAccessedTime() > o2.getLastAccessedTime()) {
- return 1;
- }
-
- return Long.compare(o1.bucketId, o2.bucketId);
- }
- });
long intervalMillis = managedStateImpl.getCheckStateSizeInterval().getMillis();
memoryFreeService.scheduleAtFixedRate(this, intervalMillis, intervalMillis);
}
void bucketAccessed(long bucketId)
{
- BucketIdTimeWrapper idTimeWrapper = bucketAccessTimes.get(bucketId);
- if (idTimeWrapper != null) {
- bucketHeap.remove(idTimeWrapper);
- } else {
- idTimeWrapper = new BucketIdTimeWrapper(bucketId);
+ long now = System.currentTimeMillis();
+ if (accessedBucketIds.add(bucketId) || now - lastUpdateAccessTime > updateAccessTimeInterval) {
+ synchronized (bucketLastAccess) {
+ for (long id : accessedBucketIds) {
+ MutableLong lastAccessTime = bucketLastAccess.get(id);
+ if (lastAccessTime != null) {
+ lastAccessTime.setValue(now);
+ } else {
+ bucketLastAccess.put(id, new MutableLong(now));
+ }
+ }
+ }
+ accessedBucketIds.clear();
+ lastUpdateAccessTime = now;
}
- idTimeWrapper.setLastAccessedTime(System.currentTimeMillis());
- bucketHeap.add(idTimeWrapper);
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@@ -105,88 +100,51 @@ class StateTracker extends TimerTask
durationMillis = duration.getMillis();
}
- BucketIdTimeWrapper idTimeWrapper;
- while (bytesSum > managedStateImpl.getMaxMemorySize() && bucketHeap.size() > 0 &&
- null != (idTimeWrapper = bucketHeap.first())) {
- //trigger buckets to free space
-
- if (System.currentTimeMillis() - idTimeWrapper.getLastAccessedTime() < durationMillis) {
- //if the least recently used bucket cannot free up space because it was accessed within the
- //specified duration then subsequent buckets cannot free space as well because this heap is ordered by time.
- break;
- }
- long bucketId = idTimeWrapper.bucketId;
- Bucket bucket = managedStateImpl.getBucket(bucketId);
- if (bucket != null) {
-
- synchronized (bucket) {
- long sizeFreed;
- try {
- sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
- LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
- } catch (IOException e) {
- managedStateImpl.throwable.set(e);
- throw new RuntimeException("freeing " + bucketId, e);
+ synchronized (bucketLastAccess) {
+ long now = System.currentTimeMillis();
+ for (Iterator<Map.Entry<Long, MutableLong>> iterator = bucketLastAccess.entrySet().iterator();
+ bytesSum > managedStateImpl.getMaxMemorySize() && iterator.hasNext(); ) {
+ Map.Entry<Long, MutableLong> entry = iterator.next();
+ if (now - entry.getValue().longValue() < durationMillis) {
+ break;
+ }
+ long bucketId = entry.getKey();
+ Bucket bucket = managedStateImpl.getBucket(bucketId);
+ if (bucket != null) {
+ synchronized (bucket) {
+ long sizeFreed;
+ try {
+ sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
+ LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
+ } catch (IOException e) {
+ managedStateImpl.throwable.set(e);
+ throw new RuntimeException("freeing " + bucketId, e);
+ }
+ bytesSum -= sizeFreed;
+ }
+ if (bucket.getSizeInBytes() == 0) {
+ iterator.remove();
}
- bytesSum -= sizeFreed;
}
- bucketHeap.remove(idTimeWrapper);
- bucketAccessTimes.remove(bucketId);
}
}
}
}
}
- void teardown()
+ public int getUpdateAccessTimeInterval()
{
- memoryFreeService.cancel();
+ return updateAccessTimeInterval;
}
- /**
- * Wrapper class for bucket id and the last time the bucket was accessed.
- */
- private static class BucketIdTimeWrapper
+ public void setUpdateAccessTimeInterval(int updateAccessTimeInterval)
{
- private final long bucketId;
- private long lastAccessedTime;
-
- BucketIdTimeWrapper(long bucketId)
- {
- this.bucketId = bucketId;
- }
-
- private synchronized long getLastAccessedTime()
- {
- return lastAccessedTime;
- }
-
- private synchronized void setLastAccessedTime(long lastAccessedTime)
- {
- this.lastAccessedTime = lastAccessedTime;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (!(o instanceof BucketIdTimeWrapper)) {
- return false;
- }
-
- BucketIdTimeWrapper that = (BucketIdTimeWrapper)o;
- //Note: the comparator used with bucket heap imposes orderings that are inconsistent with equals
- return bucketId == that.bucketId;
-
- }
+ this.updateAccessTimeInterval = updateAccessTimeInterval;
+ }
- @Override
- public int hashCode()
- {
- return (int)(bucketId ^ (bucketId >>> 32));
- }
+ void teardown()
+ {
+ memoryFreeService.cancel();
}
private static final Logger LOG = LoggerFactory.getLogger(StateTracker.class);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
index 93929e4..365cbc3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
@@ -18,9 +18,8 @@
*/
package org.apache.apex.malhar.lib.utils.serde;
-import java.util.Arrays;
-
import org.apache.commons.collections.buffer.CircularFifoBuffer;
+import org.apache.commons.lang3.mutable.MutableInt;
/**
* This implementation get the minimum number of free blocks in the period to release.
@@ -30,7 +29,6 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
{
public static final int DEFAULT_PERIOD = 60; // 60 reports
private CircularFifoBuffer freeBlockNumQueue;
- private Integer[] tmpArray;
public DefaultBlockReleaseStrategy()
{
@@ -40,8 +38,6 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
public DefaultBlockReleaseStrategy(int period)
{
freeBlockNumQueue = new CircularFifoBuffer(period);
- tmpArray = new Integer[period];
- Arrays.fill(tmpArray, 0);
}
/**
@@ -54,7 +50,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
if (freeBlockNum < 0) {
throw new IllegalArgumentException("The number of free blocks could not less than zero.");
}
- freeBlockNumQueue.add(freeBlockNum);
+ freeBlockNumQueue.add(new MutableInt(freeBlockNum));
}
/**
@@ -66,7 +62,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
{
int minNum = Integer.MAX_VALUE;
for (Object num : freeBlockNumQueue) {
- minNum = Math.min((Integer)num, minNum);
+ minNum = Math.min(((MutableInt)num).intValue(), minNum);
}
return minNum;
}
@@ -89,7 +85,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
* decrease by released blocks
*/
for (Object num : freeBlockNumQueue) {
- freeBlockNumQueue.add(Math.max((Integer)num - numReleasedBlocks, 0));
+ ((MutableInt)num).setValue(Math.max(((MutableInt)num).intValue() - numReleasedBlocks, 0));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
index fa4cd73..53710f8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
@@ -246,4 +246,28 @@ public class WindowedBlockStream extends BlockStream implements WindowListener,
}
}
+ /**
+ * This method releases all free memory immediately.
+ * This method will not be controlled by release strategy
+ */
+ public void releaseAllFreeMemory()
+ {
+ int releasedBlocks = 0;
+
+ Iterator<Integer> iter = freeBlockIds.iterator();
+ while (iter.hasNext()) {
+ //release blocks
+ int blockId = iter.next();
+ iter.remove();
+ blocks.remove(blockId);
+ releasedBlocks++;
+ }
+
+ /**
+ * report number of released blocks
+ */
+ if (releasedBlocks > 0) {
+ releaseStrategy.releasedBlocks(releasedBlocks);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 6645a98..f7e24de 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -32,7 +32,6 @@ import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource;
import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
import org.apache.apex.malhar.lib.utils.serde.StringSerde;
-import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
import com.google.common.primitives.Longs;
@@ -223,13 +222,8 @@ public class DefaultBucketTest
testMeta.defaultBucket.get(keySlice, -1, ReadSource.MEMORY);
long sizeFreed = currentSize - testMeta.defaultBucket.getSizeInBytes();
- SerializationBuffer tmpBuffer = new SerializationBuffer(new WindowedBlockStream());
- tmpBuffer.writeBytes(keyPrefix);
- tmpBuffer.writeString(key);
- tmpBuffer.writeString(value);
- int expectedFreedSize = tmpBuffer.toSlice().toByteArray().length; //key prefix, key length, key; value length, value
- Assert.assertEquals("size freed", expectedFreedSize, sizeFreed);
- Assert.assertEquals("existing size", currentSize - expectedFreedSize, testMeta.defaultBucket.getSizeInBytes());
+ Assert.assertEquals("size freed", initSize, sizeFreed);
+ Assert.assertEquals("existing size", currentSize - initSize, testMeta.defaultBucket.getSizeInBytes());
testMeta.defaultBucket.teardown();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
index 07e141a..ca78186 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
@@ -20,7 +20,7 @@
package org.apache.apex.malhar.lib.state.managed;
import java.io.IOException;
-import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.joda.time.Duration;
@@ -30,7 +30,7 @@ import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.datatorrent.api.Context;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
@@ -82,7 +82,7 @@ public class StateTrackerTest
testMeta.managedState.latch.await();
testMeta.managedState.teardown();
- Assert.assertEquals("freed bucket", Lists.newArrayList(1L), testMeta.managedState.freedBuckets);
+ Assert.assertEquals("freed bucket", Sets.newHashSet(1L), testMeta.managedState.freedBuckets);
}
@Test
@@ -101,7 +101,7 @@ public class StateTrackerTest
testMeta.managedState.latch.await();
testMeta.managedState.teardown();
- Assert.assertEquals("freed bucket", Lists.newArrayList(1L, 2L), testMeta.managedState.freedBuckets);
+ Assert.assertEquals("freed bucket", Sets.newHashSet(1L, 2L), testMeta.managedState.freedBuckets);
}
@Test
@@ -128,7 +128,7 @@ public class StateTrackerTest
private static class MockManagedStateImpl extends ManagedStateImpl
{
CountDownLatch latch;
- List<Long> freedBuckets = Lists.newArrayList();
+ Set<Long> freedBuckets = Sets.newHashSet();
@Override
protected Bucket newBucket(long bucketId)
@@ -149,8 +149,9 @@ public class StateTrackerTest
public long freeMemory(long windowId) throws IOException
{
long freedBytes = super.freeMemory(windowId);
- ((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId());
- ((MockManagedStateImpl)managedStateContext).latch.countDown();
+ if (((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId())) {
+ ((MockManagedStateImpl)managedStateContext).latch.countDown();
+ }
return freedBytes;
}