You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/26 07:31:49 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2354 Support for heuristic watermark

Repository: apex-malhar
Updated Branches:
  refs/heads/master 88295aed8 -> f22b269a9


APEXMALHAR-2354 Support for heuristic watermark


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1290c726
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1290c726
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1290c726

Branch: refs/heads/master
Commit: 1290c72649cd925fa795fd1234b29499ecb87189
Parents: cf896b0
Author: bhupeshchawda <bh...@apache.org>
Authored: Mon Jan 16 18:23:00 2017 +0530
Committer: bhupeshchawda <bh...@apache.org>
Committed: Tue Jan 24 18:08:56 2017 +0530

----------------------------------------------------------------------
 .../lib/window/ImplicitWatermarkGenerator.java  | 49 ++++++++++++
 .../window/impl/AbstractWindowedOperator.java   | 27 ++++++-
 .../impl/FixedDiffEventTimeWatermarkGen.java    | 83 ++++++++++++++++++++
 .../FixedDiffProcessingTimeWatermarkGen.java    | 76 ++++++++++++++++++
 .../malhar/lib/window/WindowedOperatorTest.java | 34 ++++++++
 5 files changed, 267 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java
new file mode 100644
index 0000000..c7e91a1
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/ImplicitWatermarkGenerator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Component;
+
+/**
+ * Interface for generators for implicit watermarks.
+ * An operator which does not want to rely on explicit watermarks (generated from upstream),
+ * can use implementations of this interface to get implicit watermarks.
+ */
+@InterfaceStability.Evolving
+public interface ImplicitWatermarkGenerator extends Component
+{
+  /**
+   * Called on arrival of every tuple.
+   * Implementations would update the state of the watermark generator in order to keep their state updated
+   * @param t the incoming windowed tuple
+   * @param currentProcessingTime the current notion of processing time
+   *                              (usually the system time generated based on the window id)
+   */
+  void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime);
+
+  /**
+   * Returns the current watermark tuple as per the generator's state
+   * @param currentProcessingTime the current notion of processing time
+   *                              (usually the system time generated based on the window id)
+   * @return the latest watermark tuple created based on the implementation
+   */
+  ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index 22e8525..b755eb4 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.apex.malhar.lib.state.spillable.WindowListener;
 import org.apache.apex.malhar.lib.window.Accumulation;
 import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.Window;
@@ -92,6 +93,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   protected long fixedWatermarkMillis = -1;
   private transient long streamingWindowId;
   private transient TreeMap<Long, Long> streamingWindowToLatenessHorizon = new TreeMap<>();
+  private ImplicitWatermarkGenerator implicitWatermarkGenerator;
 
   private Map<String, Component<Context.OperatorContext>> components = new HashMap<>();
 
@@ -148,6 +150,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
       // do the accumulation
       accumulateTuple(windowedTuple);
       processWindowState(windowedTuple);
+      if (implicitWatermarkGenerator != null) {
+        implicitWatermarkGenerator.processTupleForWatermark(windowedTuple, currentDerivedTimestamp);
+      }
     }
   }
 
@@ -264,6 +269,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     this.nextWatermark = timestamp;
   }
 
+
   /**
    * Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we
    * don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally
@@ -276,6 +282,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     this.fixedWatermarkMillis = millis;
   }
 
+  public void setImplicitWatermarkGenerator(ImplicitWatermarkGenerator implicitWatermarkGenerator)
+  {
+    this.implicitWatermarkGenerator = implicitWatermarkGenerator;
+  }
+
   public void validate() throws ValidationException
   {
     if (accumulation == null) {
@@ -429,6 +440,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     if (retractionStorage != null) {
       retractionStorage.setup(context);
     }
+    if (implicitWatermarkGenerator != null) {
+      implicitWatermarkGenerator.setup(context);
+    }
     for (Component component : components.values()) {
       component.setup(context);
     }
@@ -445,6 +459,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     if (retractionStorage != null) {
       retractionStorage.teardown();
     }
+    if (implicitWatermarkGenerator != null) {
+      implicitWatermarkGenerator.teardown();
+    }
     for (Component component : components.values()) {
       component.teardown();
     }
@@ -490,9 +507,15 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
 
   protected void processWatermarkAtEndWindow()
   {
-    if (fixedWatermarkMillis > 0) {
-      nextWatermark = currentDerivedTimestamp - fixedWatermarkMillis;
+    long implicitWatermark = -1;
+    if (implicitWatermarkGenerator != null) {
+      implicitWatermark = implicitWatermarkGenerator
+          .getWatermarkTuple(currentDerivedTimestamp).getTimestamp();
     }
+    if (implicitWatermark > nextWatermark) {
+      nextWatermark = implicitWatermark;
+    }
+
     if (nextWatermark > 0 && currentWatermark < nextWatermark) {
 
       long horizon = nextWatermark - allowedLatenessMillis;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java
new file mode 100644
index 0000000..1e61a29
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffEventTimeWatermarkGen.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import javax.annotation.Nonnegative;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * An @{@link ImplicitWatermarkGenerator} implementation for generating watermarks
+ * based on event time.
+ *
+ * Generates a watermark tuple with a fixed difference from the latest event time.
+ */
+@InterfaceStability.Evolving
+public class FixedDiffEventTimeWatermarkGen implements ImplicitWatermarkGenerator
+{
+  @Nonnegative
+  private long fixedDifference;
+  private long maxEventTime = -1;
+
+  public FixedDiffEventTimeWatermarkGen(long fixedDifference)
+  {
+    this.fixedDifference = fixedDifference;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime)
+  {
+    long eventTime = t.getTimestamp();
+    if (maxEventTime < eventTime) {
+      maxEventTime = eventTime;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime)
+  {
+    return new WatermarkImpl(maxEventTime - fixedDifference);
+  }
+
+  @Override
+  public void setup(Context context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  public void setFixedDifference(long fixedDifference)
+  {
+    this.fixedDifference = fixedDifference;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java
new file mode 100644
index 0000000..a4d9e1a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/FixedDiffProcessingTimeWatermarkGen.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.ImplicitWatermarkGenerator;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * An @{@link ImplicitWatermarkGenerator} implementation for generating watermarks
+ * based on processing time.
+ *
+ * Generates a watermark tuple with a fixed difference from the current processing time.
+ */
+@InterfaceStability.Evolving
+public class FixedDiffProcessingTimeWatermarkGen implements ImplicitWatermarkGenerator
+{
+  long fixedWatermarkMillis = -1;
+
+  public FixedDiffProcessingTimeWatermarkGen(long fixedWatermarkMillis)
+  {
+    this.fixedWatermarkMillis = fixedWatermarkMillis;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void processTupleForWatermark(Tuple.WindowedTuple t, long currentProcessingTime)
+  {
+    // do nothing
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ControlTuple.Watermark getWatermarkTuple(long currentProcessingTime)
+  {
+    return new WatermarkImpl(currentProcessingTime - fixedWatermarkMillis);
+  }
+
+  @Override
+  public void setup(Context context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  public void setFixedWatermarkMillis(long fixedWatermarkMillis)
+  {
+    this.fixedWatermarkMillis = fixedWatermarkMillis;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1290c726/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 512626e..39f4b6e 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -36,6 +36,7 @@ import org.junit.runners.Parameterized;
 
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
 import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import org.apache.apex.malhar.lib.window.impl.FixedDiffEventTimeWatermarkGen;
 import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
@@ -218,6 +219,39 @@ public class WindowedOperatorTest
     windowedOperator.teardown();
   }
 
+  @Test
+  public void testImplicitWatermarks()
+  {
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();
+    CollectorTestSink controlSink = new CollectorTestSink();
+
+    windowedOperator.controlOutput.setSink(controlSink);
+
+    windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
+    windowedOperator.setAllowedLateness(Duration.millis(1000));
+    windowedOperator.setImplicitWatermarkGenerator(new FixedDiffEventTimeWatermarkGen(100));
+
+    windowedOperator.setup(testMeta.operatorContext);
+
+    windowedOperator.beginWindow(1);
+    windowedOperator.endWindow();
+    Assert.assertEquals("We should get no watermark tuple", 0, controlSink.getCount(false));
+
+    windowedOperator.beginWindow(2);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L));
+    windowedOperator.endWindow();
+    Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false));
+    Assert.assertEquals("Check Watermark value",
+        ((ControlTuple.Watermark)controlSink.collectedTuples.get(0)).getTimestamp(), BASE);
+
+    windowedOperator.beginWindow(3);
+    windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 900L, 4L));
+    windowedOperator.endWindow();
+    Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false));
+    Assert.assertEquals("Check Watermark value",
+        ((ControlTuple.Watermark)controlSink.collectedTuples.get(1)).getTimestamp(), BASE + 800);
+  }
+
   private void testTrigger(TriggerOption.AccumulationMode accumulationMode)
   {
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator();


[2/2] apex-malhar git commit: Merge commit 'refs/pull/534/head' of github.com:apache/apex-malhar

Posted by da...@apache.org.
Merge commit 'refs/pull/534/head' of github.com:apache/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f22b269a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f22b269a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f22b269a

Branch: refs/heads/master
Commit: f22b269a993ec184ca6f09b43319c68a7bf91ba5
Parents: 88295ae 1290c72
Author: David Yan <da...@apache.org>
Authored: Wed Jan 25 23:03:32 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Wed Jan 25 23:03:32 2017 -0800

----------------------------------------------------------------------
 .../lib/window/ImplicitWatermarkGenerator.java  | 49 ++++++++++++
 .../window/impl/AbstractWindowedOperator.java   | 27 ++++++-
 .../impl/FixedDiffEventTimeWatermarkGen.java    | 83 ++++++++++++++++++++
 .../FixedDiffProcessingTimeWatermarkGen.java    | 76 ++++++++++++++++++
 .../malhar/lib/window/WindowedOperatorTest.java | 34 ++++++++
 5 files changed, 267 insertions(+), 2 deletions(-)
----------------------------------------------------------------------