You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/26 07:31:49 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2354 Support for heuristic
watermark
Repository: apex-malhar
Updated Branches:
refs/heads/master 88295aed8 -> f22b269a9
APEXMALHAR-2354 Support for heuristic watermark
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1290c726
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1290c726
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1290c726
Branch: refs/heads/master
Commit: 1290c72649cd925fa795fd1234b29499ecb87189
Parents: cf896b0
Author: bhupeshchawda <bh...@apache.org>
Authored: Mon Jan 16 18:23:00 2017 +0530
Committer: bhupeshchawda <bh...@apache.org>
Committed: Tue Jan 24 18:08:56 2017 +0530
----------------------------------------------------------------------
.../lib/window/ImplicitWatermarkGenerator.java | 49 ++++++++++++
.../window/impl/AbstractWindowedOperator.java | 27 ++++++-
.../impl/FixedDiffEventTimeWatermarkGen.java | 83 ++++++++++++++++++++
.../FixedDiffProcessingTimeWatermarkGen.java | 76 ++++++++++++++++++
.../malhar/lib/window/WindowedOperatorTest.java | 34 ++++++++
5 files changed, 267 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java
new file mode 100644
index 0000000..c7e91a1
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Component;
+
+/**
+ * Interface for generators for implicit watermarks.
+ * An operator which does not want to rely on explicit watermarks (generated from upstream),
+ * can use implementations of this interface to get implicit watermarks.
+ */
+@InterfaceStability.Evolving
+public interface ImplicitWatermarkGenerator extends Component
+{
+ /**
+ * Called on arrival of every tuple.
+ * Implementations would update the state of the watermark generator in order to keep their state updated
+ * @param t the incoming windowed tuple
+ * @param currentProcessingTime the current notion of processing time
+ * (usually the system time generated based on the window id)
+ */
+ void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime);
+
+ /**
+ * Returns the current watermark tuple as per the generator's state
+ * @param currentProcessingTime the current notion of processing time
+ * (usually the system time generated based on the window id)
+ * @return the latest watermark tuple created based on the implementation
+ */
+ ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index 22e8525..b755eb4 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.state.spillable.WindowListener;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
@@ -92,6 +93,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
protected long fixedWatermarkMillis = -1;
private transient long streamingWindowId;
private transient TreeMap<Long, Long> streamingWindowToLatenessHorizon = new TreeMap<>();
+ private ImplicitWatermarkGenerator implicitWatermarkGenerator;
private Map<String, Component<Context.OperatorContext>> components = new HashMap<>();
@@ -148,6 +150,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
// do the accumulation
accumulateTuple(windowedTuple);
processWindowState(windowedTuple);
+ if (implicitWatermarkGenerator != null) {
+ implicitWatermarkGenerator.processTupleForWatermark(windowedTuple, currentDerivedTimestamp);
+ }
}
}
@@ -264,6 +269,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
this.nextWatermark = timestamp;
}
+
/**
* Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we
* don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally
@@ -276,6 +282,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
this.fixedWatermarkMillis = millis;
}
+ public void setImplicitWatermarkGenerator(ImplicitWatermarkGenerator implicitWatermarkGenerator)
+ {
+ this.implicitWatermarkGenerator = implicitWatermarkGenerator;
+ }
+
public void validate() throws ValidationException
{
if (accumulation == null) {
@@ -429,6 +440,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
if (retractionStorage != null) {
retractionStorage.setup(context);
}
+ if (implicitWatermarkGenerator != null) {
+ implicitWatermarkGenerator.setup(context);
+ }
for (Component component : components.values()) {
component.setup(context);
}
@@ -445,6 +459,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
if (retractionStorage != null) {
retractionStorage.teardown();
}
+ if (implicitWatermarkGenerator != null) {
+ implicitWatermarkGenerator.teardown();
+ }
for (Component component : components.values()) {
component.teardown();
}
@@ -490,9 +507,15 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
protected void processWatermarkAtEndWindow()
{
- if (fixedWatermarkMillis > 0) {
- nextWatermark = currentDerivedTimestamp - fixedWatermarkMillis;
+ long implicitWatermark = -1;
+ if (implicitWatermarkGenerator != null) {
+ implicitWatermark = implicitWatermarkGenerator
+ .getWatermarkTuple(currentDerivedTimestamp).getTimestamp();
}
+ if (implicitWatermark > nextWatermark) {
+ nextWatermark = implicitWatermark;
+ }
+
if (nextWatermark > 0 && currentWatermark < nextWatermark) {
long horizon = nextWatermark - allowedLatenessMillis;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java
new file mode 100644
index 0000000..1e61a29
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import javax.annotation.Nonnegative;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * An @{@link ImplicitWatermarkGenerator} implementation for generating watermarks
+ * based on event time.
+ *
+ * Generates a watermark tuple with a fixed difference from the latest event time.
+ */
+@InterfaceStability.Evolving
+public class FixedDiffEventTimeWatermarkGen implements ImplicitWatermarkGenerator
+{
+ @Nonnegative
+ private long fixedDifference;
+ private long maxEventTime = -1;
+
+ public FixedDiffEventTimeWatermarkGen(long fixedDifference)
+ {
+ this.fixedDifference = fixedDifference;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime)
+ {
+ long eventTime = t.getTimestamp();
+ if (maxEventTime < eventTime) {
+ maxEventTime = eventTime;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime)
+ {
+ return new WatermarkImpl(maxEventTime - fixedDifference);
+ }
+
+ @Override
+ public void setup(Context context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ public void setFixedDifference(long fixedDifference)
+ {
+ this.fixedDifference = fixedDifference;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java
new file mode 100644
index 0000000..a4d9e1a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * An @{@link ImplicitWatermarkGenerator} implementation for generating watermarks
+ * based on processing time.
+ *
+ * Generates a watermark tuple with a fixed difference from the current processing time.
+ */
+@InterfaceStability.Evolving
+public class FixedDiffProcessingTimeWatermarkGen implements ImplicitWatermarkGenerator
+{
+ long fixedWatermarkMillis = -1;
+
+ public FixedDiffProcessingTimeWatermarkGen(long fixedWatermarkMillis)
+ {
+ this.fixedWatermarkMillis = fixedWatermarkMillis;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime)
+ {
+ // do nothing
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime)
+ {
+ return new WatermarkImpl(currentProcessingTime - fixedWatermarkMillis);
+ }
+
+ @Override
+ public void setup(Context context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ public void setFixedWatermarkMillis(long fixedWatermarkMillis)
+ {
+ this.fixedWatermarkMillis = fixedWatermarkMillis;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 512626e..39f4b6e 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -36,6 +36,7 @@ import org.junit.runners.Parameterized;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import org.apache.apex.malhar.lib.window.impl.FixedDiffEventTimeWatermarkGen;
import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
@@ -218,6 +219,39 @@ public class WindowedOperatorTest
windowedOperator.teardown();
}
+ @Test
+ public void testImplicitWatermarks()
+ {
+ WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
+ CollectorTestSink controlSink = new CollectorTestSink();
+
+ windowedOperator.controlOutput.setSink(controlSink);
+
+ windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
+ windowedOperator.setAllowedLateness(Duration.millis(1000));
+ windowedOperator.setImplicitWatermarkGenerator(new FixedDiffEventTimeWatermarkGen(100));
+
+ windowedOperator.setup(testMeta.operatorContext);
+
+ windowedOperator.beginWindow(1);
+ windowedOperator.endWindow();
+ Assert.assertEquals("We should get no watermark tuple", 0, controlSink.getCount(false));
+
+ windowedOperator.beginWindow(2);
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
+ windowedOperator.endWindow();
+ Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false));
+ Assert.assertEquals("Check Watermark value",
+ ((ControlTuple.Watermark)controlSink.collectedTuples.get(0)).getTimestamp(), BASE);
+
+ windowedOperator.beginWindow(3);
+ windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 900L, 4L));
+ windowedOperator.endWindow();
+ Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false));
+ Assert.assertEquals("Check Watermark value",
+ ((ControlTuple.Watermark)controlSink.collectedTuples.get(1)).getTimestamp(), BASE + 800);
+ }
+
private void testTrigger(TriggerOption.AccumulationMode accumulationMode)
{
WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
[2/2] apex-malhar git commit: Merge commit 'refs/pull/534/head' of
github.com:apache/apex-malhar
Posted by da...@apache.org.
Merge commit 'refs/pull/534/head' of github.com:apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f22b269a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f22b269a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f22b269a
Branch: refs/heads/master
Commit: f22b269a993ec184ca6f09b43319c68a7bf91ba5
Parents: 88295ae 1290c72
Author: David Yan <da...@apache.org>
Authored: Wed Jan 25 23:03:32 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Wed Jan 25 23:03:32 2017 -0800
----------------------------------------------------------------------
.../lib/window/ImplicitWatermarkGenerator.java | 49 ++++++++++++
.../window/impl/AbstractWindowedOperator.java | 27 ++++++-
.../impl/FixedDiffEventTimeWatermarkGen.java | 83 ++++++++++++++++++++
.../FixedDiffProcessingTimeWatermarkGen.java | 76 ++++++++++++++++++
.../malhar/lib/window/WindowedOperatorTest.java | 34 ++++++++
5 files changed, 267 insertions(+), 2 deletions(-)
----------------------------------------------------------------------