You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2017/03/10 11:13:38 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2350 #resolve #comment The key and value stream should match with the bucket

Repository: apex-malhar
Updated Branches:
  refs/heads/master 6dcd82120 -> 570ecaeb7


APEXMALHAR-2350 #resolve #comment The key and value stream should match with the bucket


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4edbec89
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4edbec89
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4edbec89

Branch: refs/heads/master
Commit: 4edbec89e4b6855e2f4a7517ca5538df0ad2a35e
Parents: a202cdc
Author: brightchen <br...@datatorrent.com>
Authored: Mon Nov 28 16:17:15 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Mon Mar 6 11:29:44 2017 -0800

----------------------------------------------------------------------
 .../managed/ManagedTimeUnifiedStateImpl.java    |   2 +-
 .../lib/state/spillable/SpillableMapImpl.java   |  30 +++++-
 .../lib/utils/serde/KeyValueSerdeManager.java   |  24 ++++-
 .../state/spillable/SpillableMapImplTest.java   | 103 +++++++++++++++++++
 4 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index 62ebbc5..82d381c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -216,7 +216,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem
     @Override
     protected void addBucketName(long bucketId)
     {
-      long operatorId = (long)managedStateContext.getOperatorContext().getId();
+      long operatorId = managedStateContext.getOperatorContext().getId();
       if (!bucketNamesOnFS.contains(operatorId)) {
         bucketNamesOnFS.add(operatorId);
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index e7071a2..56a3b0e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
 import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
 import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
@@ -150,7 +151,7 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
       return val;
     }
 
-    Slice valSlice = store.getSync(getBucket(key), keyValueSerdeManager.serializeDataKey(key, false));
+    Slice valSlice = store.getSync(getBucketTimeOrId(key), keyValueSerdeManager.serializeDataKey(key, false));
 
     if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
       return null;
@@ -236,13 +237,29 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   @Override
   public void endWindow()
   {
+    boolean isTimeUnifiedStore = (store instanceof ManagedTimeUnifiedStateImpl);
     for (K key: cache.getChangedKeys()) {
-      store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true),
+      //the getBucket() returned in fact is time, the bucket assign then assigned the bucketId
+      long timeOrBucketId = bucket;
+      long bucketId = timeOrBucketId;
+      if (isTimeUnifiedStore) {
+        timeOrBucketId = getBucketTimeOrId(key);
+        bucketId = ((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId);
+      }
+      keyValueSerdeManager.updateBuffersForBucketChange(bucketId);
+      store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, true),
           keyValueSerdeManager.serializeValue(cache.get(key)));
     }
 
     for (K key: cache.getRemovedKeys()) {
-      store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
+      long timeOrBucketId = bucket;
+      long bucketId = timeOrBucketId;
+      if (isTimeUnifiedStore) {
+        timeOrBucketId = getBucketTimeOrId(key);
+        bucketId = ((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId);
+      }
+      keyValueSerdeManager.updateBuffersForBucketChange(bucketId);
+      store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
     }
     cache.endWindow();
     keyValueSerdeManager.resetReadBuffer();
@@ -253,7 +270,12 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   {
   }
 
-  private long getBucket(K key)
+  /**
+   *
+   * @param key
+   * @return The bucket time for time unified store or bucket id for store with fixed bucket
+   */
+  private long getBucketTimeOrId(K key)
   {
     return timeExtractor != null ? timeExtractor.getTime(key) : bucket;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
index e74c7a3..405683b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
@@ -28,6 +28,7 @@ import com.datatorrent.netlet.util.Slice;
  */
 public class KeyValueSerdeManager<K, V>
 {
+  public static final long INVALID_BUCKET_ID = -1;
   protected Serde<K> keySerde;
   protected Serde<V> valueSerde;
 
@@ -36,6 +37,8 @@ public class KeyValueSerdeManager<K, V>
 
   protected SerializationBuffer valueBuffer;
 
+  private long lastBucketId = INVALID_BUCKET_ID;
+  private transient BucketProvider bucketProvider;
 
   protected KeyValueSerdeManager()
   {
@@ -50,13 +53,28 @@ public class KeyValueSerdeManager<K, V>
 
   public void setup(BucketProvider bp, long bucketId)
   {
-    //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize
-    Bucket bucketInst = bp.ensureBucket(bucketId);
-    this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
+    bucketProvider = bp;
+    updateBuffersForBucketChange(bucketId);
+  }
 
+  /**
+   * The bucket can be changed. The write buffer should also be changed if bucket changed.
+   * @param bucketId
+   */
+  public void updateBuffersForBucketChange(long bucketId)
+  {
+    if (lastBucketId == bucketId) {
+      return;
+    }
+
+    Bucket bucketInst = bucketProvider.ensureBucket(bucketId);
+    this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
     keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream());
+
+    lastBucketId = bucketId;
   }
 
+
   public Slice serializeKey(K key, boolean write)
   {
     SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
index 760bc5c..ce51a03 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -25,8 +25,13 @@ import org.junit.runner.RunWith;
 
 import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
 import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
+import com.google.common.base.Preconditions;
+
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
@@ -452,4 +457,102 @@ public class SpillableMapImplTest
 
     map1.teardown();
   }
+
+
+  protected static class SerdeManagerForTest<K, V> extends AffixKeyValueSerdeManager<K, V>
+  {
+    public SerdeManagerForTest(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde<K> keySerde, Serde<V> valueSerde)
+    {
+      super(metaKeySuffix, dataKeyIdentifier, keySerde, valueSerde);
+    }
+
+    public SerializationBuffer getValueBuffer()
+    {
+      return valueBuffer;
+    }
+
+    public SerializationBuffer getKeyBufferForWrite()
+    {
+      return keyBufferForWrite;
+    }
+  }
+
+  protected static class SpillableMapImplForTest<K, V> extends SpillableMapImpl<K, V>
+  {
+    protected SerdeManagerForTest<K, V> serdeManager;
+
+    public SpillableMapImplForTest(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> serdeKey,
+        Serde<V> serdeValue)
+    {
+      super(store, identifier, bucket, serdeKey, serdeValue);
+      serdeManager = new SerdeManagerForTest<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+      keyValueSerdeManager = serdeManager;
+    }
+
+    public SpillableMapImplForTest(SpillableStateStore store, byte[] identifier, Serde<K> serdeKey,
+        Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+    {
+      super(store, identifier, serdeKey, serdeValue, timeExtractor);
+      serdeManager = new SerdeManagerForTest<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+      keyValueSerdeManager = serdeManager;
+    }
+  }
+
+  @Test
+  @Parameters({"TimeUnifiedManagedState"})
+  public void serializationBufferTest(String opt)
+  {
+    SerializationBuffer keyBuffer = null;
+    SerializationBuffer valueBuffer = null;
+    SerializationBuffer currentBuffer;
+
+    setup(opt);
+    SpillableMapImplForTest<String, String> map;
+    if (te == null) {
+      map = new SpillableMapImplForTest<>(store,ID1,0L,new StringSerde(), new StringSerde());
+    } else {
+      map = new SpillableMapImplForTest<>(store,ID1,new StringSerde(), new StringSerde(), te);
+    }
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    map.put("a", "1");
+
+    map.endWindow();
+    store.endWindow();
+
+    currentBuffer = map.serdeManager.getKeyBufferForWrite();
+    Assert.assertTrue(currentBuffer != keyBuffer);
+    keyBuffer = currentBuffer;
+
+    currentBuffer = map.serdeManager.getValueBuffer();
+    Assert.assertTrue(currentBuffer != valueBuffer);
+    valueBuffer = currentBuffer;
+
+    ++windowId;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    //each put use different key to make sure use the different bucket
+    map.put("b", "2");
+
+    map.endWindow();
+    store.endWindow();
+
+    currentBuffer = map.serdeManager.getKeyBufferForWrite();
+    Assert.assertTrue(currentBuffer != keyBuffer);
+    keyBuffer = currentBuffer;
+
+    currentBuffer = map.serdeManager.getValueBuffer();
+    Assert.assertTrue(currentBuffer != valueBuffer);
+    valueBuffer = currentBuffer;
+
+    map.teardown();
+    store.teardown();
+  }
 }


[2/2] apex-malhar git commit: Merge commit 'refs/pull/515/head' of https://github.com/apache/apex-malhar

Posted by bh...@apache.org.
Merge commit 'refs/pull/515/head' of https://github.com/apache/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/570ecaeb
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/570ecaeb
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/570ecaeb

Branch: refs/heads/master
Commit: 570ecaeb7aea528fedd2a3281a8c8d5ca53576e0
Parents: 6dcd821 4edbec8
Author: bhupeshchawda <bh...@apache.org>
Authored: Fri Mar 10 15:41:01 2017 +0530
Committer: bhupeshchawda <bh...@apache.org>
Committed: Fri Mar 10 15:41:01 2017 +0530

----------------------------------------------------------------------
 .../managed/ManagedTimeUnifiedStateImpl.java    |   2 +-
 .../lib/state/spillable/SpillableMapImpl.java   |  30 +++++-
 .../lib/utils/serde/KeyValueSerdeManager.java   |  24 ++++-
 .../state/spillable/SpillableMapImplTest.java   | 103 +++++++++++++++++++
 4 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------