You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/10/03 22:43:58 UTC

apex-malhar git commit: APEXMALHAR-2273 #resolve do not fire retraction trigger if retraction value is same as current value and fireOnlyUpdatedPanes is true

Repository: apex-malhar
Updated Branches:
  refs/heads/master 1333910fd -> 9f04db7f6


APEXMALHAR-2273 #resolve do not fire retraction trigger if retraction value is same as current value and fireOnlyUpdatedPanes is true


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9f04db7f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9f04db7f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9f04db7f

Branch: refs/heads/master
Commit: 9f04db7f6d33d2bcc9ca610e130c5694b58416af
Parents: 1333910
Author: David Yan <da...@datatorrent.com>
Authored: Thu Sep 29 16:35:55 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Oct 3 14:09:46 2016 -0700

----------------------------------------------------------------------
 .../window/impl/AbstractWindowedOperator.java   |  5 +--
 .../window/impl/KeyedWindowedOperatorImpl.java  | 19 ++++++++---
 .../lib/window/impl/WindowedOperatorImpl.java   | 10 ++++--
 .../malhar/lib/window/WindowedOperatorTest.java | 36 ++++++++++++++++----
 4 files changed, 54 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index c778523..0ece11e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -526,7 +526,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   public void fireTrigger(Window window, WindowState windowState)
   {
     if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
-      fireRetractionTrigger(window);
+      fireRetractionTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
     }
     fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
     windowState.lastTriggerFiredTime = currentDerivedTimestamp;
@@ -548,8 +548,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
    * mode is ACCUMULATING_AND_RETRACTING
    *
    * @param window the window to fire the retraction trigger on
+   * @param fireOnlyUpdatedPanes Do not fire trigger if the retraction value is the same as the new value.
    */
-  public abstract void fireRetractionTrigger(Window window);
+  public abstract void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes);
 
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index a38207a..6fab7de 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -79,7 +79,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
           if (triggerOption != null &&
               triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
             // fire a retraction trigger because the session window will be enlarged
-            fireRetractionTrigger(sessionWindow);
+            fireRetractionTrigger(sessionWindow, false);
           }
           // create a new session window that covers the timestamp
           long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp);
@@ -105,8 +105,8 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
         if (triggerOption != null &&
             triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
           // fire a retraction trigger because the two session windows will be merged to a new window
-          fireRetractionTrigger(sessionWindow1);
-          fireRetractionTrigger(sessionWindow2);
+          fireRetractionTrigger(sessionWindow1, false);
+          fireRetractionTrigger(sessionWindow2, false);
         }
         long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp());
         long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(),
@@ -149,7 +149,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
   {
     for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
       OutputValT outputVal = accumulation.getOutput(entry.getValue());
-      if (fireOnlyUpdatedPanes) {
+      if (fireOnlyUpdatedPanes && retractionStorage != null) {
         OutputValT oldValue = retractionStorage.get(window, entry.getKey());
         if (oldValue != null && oldValue.equals(outputVal)) {
           continue;
@@ -163,12 +163,21 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
   }
 
   @Override
-  public void fireRetractionTrigger(Window window)
+  public void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes)
   {
     if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
       throw new UnsupportedOperationException();
     }
     for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entries(window)) {
+      if (fireOnlyUpdatedPanes) {
+        AccumT currentAccum = dataStorage.get(window, entry.getKey());
+        if (currentAccum != null) {
+          OutputValT currentValue = accumulation.getOutput(currentAccum);
+          if (currentValue != null && currentValue.equals(entry.getValue())) {
+            continue;
+          }
+        }
+      }
       output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
index 7275d88..26e011a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
@@ -56,7 +56,7 @@ public class WindowedOperatorImpl<InputT, AccumT, OutputT>
   {
     AccumT accumulatedValue = dataStorage.get(window);
     OutputT outputValue = accumulation.getOutput(accumulatedValue);
-    if (fireOnlyUpdatedPanes) {
+    if (fireOnlyUpdatedPanes && retractionStorage != null) {
       OutputT oldValue = retractionStorage.get(window);
       if (oldValue != null && oldValue.equals(outputValue)) {
         return;
@@ -69,13 +69,19 @@ public class WindowedOperatorImpl<InputT, AccumT, OutputT>
   }
 
   @Override
-  public void fireRetractionTrigger(Window window)
+  public void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes)
   {
     if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
       throw new UnsupportedOperationException();
     }
     OutputT oldValue = retractionStorage.get(window);
     if (oldValue != null) {
+      if (fireOnlyUpdatedPanes) {
+        AccumT accumulatedValue = dataStorage.get(window);
+        if (accumulatedValue != null && oldValue.equals(accumulation.getOutput(accumulatedValue))) {
+          return;
+        }
+      }
       output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue)));
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index bc5d80f..15aba82 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -298,20 +298,36 @@ public class WindowedOperatorTest
   @Test
   public void testTriggerWithAccumulatingModeFiringAllPanes()
   {
-    testTriggerWithAccumulatingModeHelper(false);
+    testTrigger2(false, false);
+  }
+
+  @Test
+  public void testTriggerWithAccumulatingAndRetractingModeFiringAllPanes()
+  {
+    testTrigger2(false, true);
   }
 
   @Test
   public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes()
   {
-    testTriggerWithAccumulatingModeHelper(true);
+    testTrigger2(true, false);
+  }
+
+  @Test
+  public void testTriggerWithAccumulatingAndRetractingModeFiringOnlyUpdatedPanes()
+  {
+    testTrigger2(true, true);
   }
 
-  public void testTriggerWithAccumulatingModeHelper(boolean firingOnlyUpdatedPanes)
+  private void testTrigger2(boolean firingOnlyUpdatedPanes, boolean testRetraction)
   {
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
-    TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000))
-        .accumulatingFiredPanes();
+    TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000));
+    if (testRetraction) {
+      triggerOption.accumulatingAndRetractingFiredPanes();
+    } else {
+      triggerOption.accumulatingFiredPanes();
+    }
     if (firingOnlyUpdatedPanes) {
       triggerOption.firingOnlyUpdatedPanes();
     }
@@ -342,8 +358,14 @@ public class WindowedOperatorTest
       Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples
           .isEmpty());
     } else {
-      Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
-      Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+      if (testRetraction) {
+        Assert.assertEquals("There should be exactly two tuples for the time trigger", 2, sink.collectedTuples.size());
+        Assert.assertEquals(-5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+        Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(1)).getValue().longValue());
+      } else {
+        Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
+        Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+      }
     }
     windowedOperator.teardown();
   }