You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/09/30 23:01:24 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2276 over writing the value of a key in the same time bucket

Repository: apex-malhar
Updated Branches:
  refs/heads/master 58201fbf9 -> baff632ae


APEXMALHAR-2276 over writing the value of a key in the same time bucket


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5a0d1ebc
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5a0d1ebc
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5a0d1ebc

Branch: refs/heads/master
Commit: 5a0d1ebc07900811cbe57aa7f2867d9e4574d4cf
Parents: 139d89f
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Sep 30 13:01:09 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Sep 30 14:24:15 2016 -0700

----------------------------------------------------------------------
 .../apex/malhar/lib/state/managed/Bucket.java    |  5 ++++-
 .../managed/ManagedTimeUnifiedStateImplTest.java | 19 +++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5a0d1ebc/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 0ed1865..74364a9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -389,12 +389,15 @@ public interface Bucket extends ManagedStateComponent
         sizeInBytes.getAndAdd(key.length);
         sizeInBytes.getAndAdd(Long.SIZE);
       }
-      if (timeBucket > bucketedValue.getTimeBucket()) {
+      if (timeBucket >= bucketedValue.getTimeBucket()) {
 
         int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
         sizeInBytes.getAndAdd(inc);
         bucketedValue.setTimeBucket(timeBucket);
         bucketedValue.setValue(value);
+      } else {
+        LOG.warn("ignoring {} {} {}; existing {} {}", key, value, timeBucket,
+            bucketedValue.getValue(), bucketedValue.getTimeBucket());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5a0d1ebc/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 6808b63..25596fa 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -88,6 +88,25 @@ public class ManagedTimeUnifiedStateImplTest
   }
 
   @Test
+  public void testPutWithMultipleValuesForAKey()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(time, one, one);
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.managedState.put(time, one, two);
+    Slice value = testMeta.managedState.getSync(time, one);
+    testMeta.managedState.endWindow();
+
+    Assert.assertEquals("value overwritten", two, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
   public void testAsyncGet() throws ExecutionException, InterruptedException
   {
     Slice one = ManagedStateTestUtils.getSliceFor("1");


[2/2] apex-malhar git commit: Merge commit 'refs/pull/437/head' of github.com:apache/apex-malhar

Posted by da...@apache.org.
Merge commit 'refs/pull/437/head' of github.com:apache/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/baff632a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/baff632a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/baff632a

Branch: refs/heads/master
Commit: baff632ae68d517013a731c4c19f9d721dfe56ce
Parents: 58201fb 5a0d1eb
Author: David Yan <da...@datatorrent.com>
Authored: Fri Sep 30 16:01:07 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Sep 30 16:01:07 2016 -0700

----------------------------------------------------------------------
 .../apex/malhar/lib/state/managed/Bucket.java    |  5 ++++-
 .../managed/ManagedTimeUnifiedStateImplTest.java | 19 +++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------