You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/10/03 22:43:58 UTC
apex-malhar git commit: APEXMALHAR-2273 #resolve do not fire
retraction trigger if retraction value is same as current value and
fireOnlyUpdatedPanes is true
Repository: apex-malhar
Updated Branches:
refs/heads/master 1333910fd -> 9f04db7f6
APEXMALHAR-2273 #resolve do not fire retraction trigger if retraction value is same as current value and fireOnlyUpdatedPanes is true
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9f04db7f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9f04db7f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9f04db7f
Branch: refs/heads/master
Commit: 9f04db7f6d33d2bcc9ca610e130c5694b58416af
Parents: 1333910
Author: David Yan <da...@datatorrent.com>
Authored: Thu Sep 29 16:35:55 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Oct 3 14:09:46 2016 -0700
----------------------------------------------------------------------
.../window/impl/AbstractWindowedOperator.java | 5 +--
.../window/impl/KeyedWindowedOperatorImpl.java | 19 ++++++++---
.../lib/window/impl/WindowedOperatorImpl.java | 10 ++++--
.../malhar/lib/window/WindowedOperatorTest.java | 36 ++++++++++++++++----
4 files changed, 54 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index c778523..0ece11e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -526,7 +526,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
public void fireTrigger(Window window, WindowState windowState)
{
if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
- fireRetractionTrigger(window);
+ fireRetractionTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
}
fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
windowState.lastTriggerFiredTime = currentDerivedTimestamp;
@@ -548,8 +548,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
* mode is ACCUMULATING_AND_RETRACTING
*
* @param window the window to fire the retraction trigger on
+ * @param fireOnlyUpdatedPanes Do not fire trigger if the retraction value is the same as the new value.
*/
- public abstract void fireRetractionTrigger(Window window);
+ public abstract void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes);
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index a38207a..6fab7de 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -79,7 +79,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
if (triggerOption != null &&
triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
// fire a retraction trigger because the session window will be enlarged
- fireRetractionTrigger(sessionWindow);
+ fireRetractionTrigger(sessionWindow, false);
}
// create a new session window that covers the timestamp
long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp);
@@ -105,8 +105,8 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
if (triggerOption != null &&
triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
// fire a retraction trigger because the two session windows will be merged to a new window
- fireRetractionTrigger(sessionWindow1);
- fireRetractionTrigger(sessionWindow2);
+ fireRetractionTrigger(sessionWindow1, false);
+ fireRetractionTrigger(sessionWindow2, false);
}
long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp());
long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(),
@@ -149,7 +149,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
{
for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
OutputValT outputVal = accumulation.getOutput(entry.getValue());
- if (fireOnlyUpdatedPanes) {
+ if (fireOnlyUpdatedPanes && retractionStorage != null) {
OutputValT oldValue = retractionStorage.get(window, entry.getKey());
if (oldValue != null && oldValue.equals(outputVal)) {
continue;
@@ -163,12 +163,21 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
}
@Override
- public void fireRetractionTrigger(Window window)
+ public void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes)
{
if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
throw new UnsupportedOperationException();
}
for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entries(window)) {
+ if (fireOnlyUpdatedPanes) {
+ AccumT currentAccum = dataStorage.get(window, entry.getKey());
+ if (currentAccum != null) {
+ OutputValT currentValue = accumulation.getOutput(currentAccum);
+ if (currentValue != null && currentValue.equals(entry.getValue())) {
+ continue;
+ }
+ }
+ }
output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
index 7275d88..26e011a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
@@ -56,7 +56,7 @@ public class WindowedOperatorImpl<InputT, AccumT, OutputT>
{
AccumT accumulatedValue = dataStorage.get(window);
OutputT outputValue = accumulation.getOutput(accumulatedValue);
- if (fireOnlyUpdatedPanes) {
+ if (fireOnlyUpdatedPanes && retractionStorage != null) {
OutputT oldValue = retractionStorage.get(window);
if (oldValue != null && oldValue.equals(outputValue)) {
return;
@@ -69,13 +69,19 @@ public class WindowedOperatorImpl<InputT, AccumT, OutputT>
}
@Override
- public void fireRetractionTrigger(Window window)
+ public void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes)
{
if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
throw new UnsupportedOperationException();
}
OutputT oldValue = retractionStorage.get(window);
if (oldValue != null) {
+ if (fireOnlyUpdatedPanes) {
+ AccumT accumulatedValue = dataStorage.get(window);
+ if (accumulatedValue != null && oldValue.equals(accumulation.getOutput(accumulatedValue))) {
+ return;
+ }
+ }
output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue)));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index bc5d80f..15aba82 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -298,20 +298,36 @@ public class WindowedOperatorTest
@Test
public void testTriggerWithAccumulatingModeFiringAllPanes()
{
- testTriggerWithAccumulatingModeHelper(false);
+ testTrigger2(false, false);
+ }
+
+ @Test
+ public void testTriggerWithAccumulatingAndRetractingModeFiringAllPanes()
+ {
+ testTrigger2(false, true);
}
@Test
public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes()
{
- testTriggerWithAccumulatingModeHelper(true);
+ testTrigger2(true, false);
+ }
+
+ @Test
+ public void testTriggerWithAccumulatingAndRetractingModeFiringOnlyUpdatedPanes()
+ {
+ testTrigger2(true, true);
}
- public void testTriggerWithAccumulatingModeHelper(boolean firingOnlyUpdatedPanes)
+ private void testTrigger2(boolean firingOnlyUpdatedPanes, boolean testRetraction)
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
- TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000))
- .accumulatingFiredPanes();
+ TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000));
+ if (testRetraction) {
+ triggerOption.accumulatingAndRetractingFiredPanes();
+ } else {
+ triggerOption.accumulatingFiredPanes();
+ }
if (firingOnlyUpdatedPanes) {
triggerOption.firingOnlyUpdatedPanes();
}
@@ -342,8 +358,14 @@ public class WindowedOperatorTest
Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples
.isEmpty());
} else {
- Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
- Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+ if (testRetraction) {
+ Assert.assertEquals("There should be exactly two tuples for the time trigger", 2, sink.collectedTuples.size());
+ Assert.assertEquals(-5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+ Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(1)).getValue().longValue());
+ } else {
+ Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
+ Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
+ }
}
windowedOperator.teardown();
}