You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2017/03/10 11:13:38 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2350 #resolve #comment The
key and value stream should match with the bucket
Repository: apex-malhar
Updated Branches:
refs/heads/master 6dcd82120 -> 570ecaeb7
APEXMALHAR-2350 #resolve #comment The key and value stream should match with the bucket
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4edbec89
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4edbec89
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4edbec89
Branch: refs/heads/master
Commit: 4edbec89e4b6855e2f4a7517ca5538df0ad2a35e
Parents: a202cdc
Author: brightchen <br...@datatorrent.com>
Authored: Mon Nov 28 16:17:15 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Mon Mar 6 11:29:44 2017 -0800
----------------------------------------------------------------------
.../managed/ManagedTimeUnifiedStateImpl.java | 2 +-
.../lib/state/spillable/SpillableMapImpl.java | 30 +++++-
.../lib/utils/serde/KeyValueSerdeManager.java | 24 ++++-
.../state/spillable/SpillableMapImplTest.java | 103 +++++++++++++++++++
4 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index 62ebbc5..82d381c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -216,7 +216,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
@Override
protected void addBucketName(long bucketId)
{
- long operatorId = (long)managedStateContext.getOperatorContext().getId();
+ long operatorId = managedStateContext.getOperatorContext().getId();
if (!bucketNamesOnFS.contains(operatorId)) {
bucketNamesOnFS.add(operatorId);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index e7071a2..56a3b0e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,6 +26,7 @@ import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
@@ -150,7 +151,7 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
return val;
}
- Slice valSlice = store.getSync(getBucket(key), keyValueSerdeManager.serializeDataKey(key, false));
+ Slice valSlice = store.getSync(getBucketTimeOrId(key), keyValueSerdeManager.serializeDataKey(key, false));
if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
return null;
@@ -236,13 +237,29 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
@Override
public void endWindow()
{
+ boolean isTimeUnifiedStore = (store instanceof ManagedTimeUnifiedStateImpl);
for (K key: cache.getChangedKeys()) {
- store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true),
+ //the getBucket() returned in fact is time, the bucket assign then assigned the bucketId
+ long timeOrBucketId = bucket;
+ long bucketId = timeOrBucketId;
+ if (isTimeUnifiedStore) {
+ timeOrBucketId = getBucketTimeOrId(key);
+ bucketId = ((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId);
+ }
+ keyValueSerdeManager.updateBuffersForBucketChange(bucketId);
+ store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, true),
keyValueSerdeManager.serializeValue(cache.get(key)));
}
for (K key: cache.getRemovedKeys()) {
- store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
+ long timeOrBucketId = bucket;
+ long bucketId = timeOrBucketId;
+ if (isTimeUnifiedStore) {
+ timeOrBucketId = getBucketTimeOrId(key);
+ bucketId = ((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId);
+ }
+ keyValueSerdeManager.updateBuffersForBucketChange(bucketId);
+ store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
}
cache.endWindow();
keyValueSerdeManager.resetReadBuffer();
@@ -253,7 +270,12 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
{
}
- private long getBucket(K key)
+ /**
+ *
+ * @param key
+ * @return The bucket time for time unified store or bucket id for store with fixed bucket
+ */
+ private long getBucketTimeOrId(K key)
{
return timeExtractor != null ? timeExtractor.getTime(key) : bucket;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
index e74c7a3..405683b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
@@ -28,6 +28,7 @@ import com.datatorrent.netlet.util.Slice;
*/
public class KeyValueSerdeManager<K, V>
{
+ public static final long INVALID_BUCKET_ID = -1;
protected Serde<K> keySerde;
protected Serde<V> valueSerde;
@@ -36,6 +37,8 @@ public class KeyValueSerdeManager<K, V>
protected SerializationBuffer valueBuffer;
+ private long lastBucketId = INVALID_BUCKET_ID;
+ private transient BucketProvider bucketProvider;
protected KeyValueSerdeManager()
{
@@ -50,13 +53,28 @@ public class KeyValueSerdeManager<K, V>
public void setup(BucketProvider bp, long bucketId)
{
- //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize
- Bucket bucketInst = bp.ensureBucket(bucketId);
- this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
+ bucketProvider = bp;
+ updateBuffersForBucketChange(bucketId);
+ }
+ /**
+ * The bucket can be changed. The write buffer should also be changed if bucket changed.
+ * @param bucketId
+ */
+ public void updateBuffersForBucketChange(long bucketId)
+ {
+ if (lastBucketId == bucketId) {
+ return;
+ }
+
+ Bucket bucketInst = bucketProvider.ensureBucket(bucketId);
+ this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream());
+
+ lastBucketId = bucketId;
}
+
public Slice serializeKey(K key, boolean write)
{
SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
index 760bc5c..ce51a03 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -25,8 +25,13 @@ import org.junit.runner.RunWith;
import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import com.google.common.base.Preconditions;
+
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
@@ -452,4 +457,102 @@ public class SpillableMapImplTest
map1.teardown();
}
+
+
+ protected static class SerdeManagerForTest<K, V> extends AffixKeyValueSerdeManager<K, V>
+ {
+ public SerdeManagerForTest(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde<K> keySerde, Serde<V> valueSerde)
+ {
+ super(metaKeySuffix, dataKeyIdentifier, keySerde, valueSerde);
+ }
+
+ public SerializationBuffer getValueBuffer()
+ {
+ return valueBuffer;
+ }
+
+ public SerializationBuffer getKeyBufferForWrite()
+ {
+ return keyBufferForWrite;
+ }
+ }
+
+ protected static class SpillableMapImplForTest<K, V> extends SpillableMapImpl<K, V>
+ {
+ protected SerdeManagerForTest<K, V> serdeManager;
+
+ public SpillableMapImplForTest(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue)
+ {
+ super(store, identifier, bucket, serdeKey, serdeValue);
+ serdeManager = new SerdeManagerForTest<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+ keyValueSerdeManager = serdeManager;
+ }
+
+ public SpillableMapImplForTest(SpillableStateStore store, byte[] identifier, Serde<K> serdeKey,
+ Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+ {
+ super(store, identifier, serdeKey, serdeValue, timeExtractor);
+ serdeManager = new SerdeManagerForTest<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+ keyValueSerdeManager = serdeManager;
+ }
+ }
+
+ @Test
+ @Parameters({"TimeUnifiedManagedState"})
+ public void serializationBufferTest(String opt)
+ {
+ SerializationBuffer keyBuffer = null;
+ SerializationBuffer valueBuffer = null;
+ SerializationBuffer currentBuffer;
+
+ setup(opt);
+ SpillableMapImplForTest<String, String> map;
+ if (te == null) {
+ map = new SpillableMapImplForTest<>(store,ID1,0L,new StringSerde(), new StringSerde());
+ } else {
+ map = new SpillableMapImplForTest<>(store,ID1,new StringSerde(), new StringSerde(), te);
+ }
+
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ long windowId = 0L;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ map.put("a", "1");
+
+ map.endWindow();
+ store.endWindow();
+
+ currentBuffer = map.serdeManager.getKeyBufferForWrite();
+ Assert.assertTrue(currentBuffer != keyBuffer);
+ keyBuffer = currentBuffer;
+
+ currentBuffer = map.serdeManager.getValueBuffer();
+ Assert.assertTrue(currentBuffer != valueBuffer);
+ valueBuffer = currentBuffer;
+
+ ++windowId;
+ store.beginWindow(windowId);
+ map.beginWindow(windowId);
+
+ //each put use different key to make sure use the different bucket
+ map.put("b", "2");
+
+ map.endWindow();
+ store.endWindow();
+
+ currentBuffer = map.serdeManager.getKeyBufferForWrite();
+ Assert.assertTrue(currentBuffer != keyBuffer);
+ keyBuffer = currentBuffer;
+
+ currentBuffer = map.serdeManager.getValueBuffer();
+ Assert.assertTrue(currentBuffer != valueBuffer);
+ valueBuffer = currentBuffer;
+
+ map.teardown();
+ store.teardown();
+ }
}
[2/2] apex-malhar git commit: Merge commit 'refs/pull/515/head' of
https://github.com/apache/apex-malhar
Posted by bh...@apache.org.
Merge commit 'refs/pull/515/head' of https://github.com/apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/570ecaeb
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/570ecaeb
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/570ecaeb
Branch: refs/heads/master
Commit: 570ecaeb7aea528fedd2a3281a8c8d5ca53576e0
Parents: 6dcd821 4edbec8
Author: bhupeshchawda <bh...@apache.org>
Authored: Fri Mar 10 15:41:01 2017 +0530
Committer: bhupeshchawda <bh...@apache.org>
Committed: Fri Mar 10 15:41:01 2017 +0530
----------------------------------------------------------------------
.../managed/ManagedTimeUnifiedStateImpl.java | 2 +-
.../lib/state/spillable/SpillableMapImpl.java | 30 +++++-
.../lib/utils/serde/KeyValueSerdeManager.java | 24 ++++-
.../state/spillable/SpillableMapImplTest.java | 103 +++++++++++++++++++
4 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------