You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/08/29 20:11:09 UTC

[2/5] flink git commit: [FLINK-7245] [table] Add operators to hold back watermarks with static delays.

[FLINK-7245] [table] Add operators to hold back watermarks with static delays.

This closes #4530.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68fdaa57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68fdaa57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68fdaa57

Branch: refs/heads/master
Commit: 68fdaa57e35b8ee30a262aad4d26926b18054c57
Parents: 1fc0b64
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Aug 9 20:54:16 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 29 22:10:17 2017 +0200

----------------------------------------------------------------------
 ...yedCoProcessOperatorWithWatermarkDelay.scala | 58 ++++++++++++
 ...KeyedProcessOperatorWithWatermarkDelay.scala | 59 +++++++++++++
 ...oProcessOperatorWithWatermarkDelayTest.scala | 93 ++++++++++++++++++++
 ...dProcessOperatorWithWatermarkDelayTest.scala | 78 ++++++++++++++++
 .../api/operators/AbstractStreamOperator.java   |  2 +-
 .../operators/InternalTimeServiceManager.java   |  2 +-
 6 files changed, 290 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala
new file mode 100644
index 0000000..f25de25
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+  * A [[KeyedCoProcessOperator]] that supports holding back watermarks with a static delay.
+  */
+class KeyedCoProcessOperatorWithWatermarkDelay[KEY, IN1, IN2, OUT](
+    private val flatMapper: CoProcessFunction[IN1, IN2, OUT],
+    private val watermarkDelay: Long = 0L)
+  extends KeyedCoProcessOperator[KEY, IN1, IN2, OUT](flatMapper) {
+
+  /** emits watermark without delay */
+  def emitWithoutDelay(mark: Watermark): Unit = output.emitWatermark(mark)
+
+  /** emits watermark with delay */
+  def emitWithDelay(mark: Watermark): Unit = {
+    output.emitWatermark(new Watermark(mark.getTimestamp - watermarkDelay))
+  }
+
+  if (watermarkDelay < 0) {
+    throw new IllegalArgumentException("The watermark delay should be non-negative.")
+  }
+
+  // choose watermark emitter
+  val emitter: Watermark => Unit = if (watermarkDelay == 0) {
+    emitWithoutDelay
+  } else {
+    emitWithDelay
+  }
+
+  @throws[Exception]
+  override def processWatermark(mark: Watermark) {
+    if (timeServiceManager != null) timeServiceManager.advanceWatermark(mark)
+
+    emitter(mark)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
new file mode 100644
index 0000000..74b4773
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+  * A [[KeyedProcessOperator]] that supports holding back watermarks with a static delay.
+  */
+class KeyedProcessOperatorWithWatermarkDelay[KEY, IN, OUT](
+    private val function: ProcessFunction[IN, OUT],
+    private var watermarkDelay: Long = 0L)
+  extends KeyedProcessOperator[KEY, IN, OUT](function) {
+
+  /** emits watermark without delay */
+  def emitWithoutDelay(mark: Watermark): Unit = output.emitWatermark(mark)
+
+  /** emits watermark with delay */
+  def emitWithDelay(mark: Watermark): Unit = {
+    output.emitWatermark(new Watermark(mark.getTimestamp - watermarkDelay))
+  }
+
+  if (watermarkDelay < 0) {
+    throw new IllegalArgumentException("The watermark delay should be non-negative.")
+  }
+
+  // choose watermark emitter
+  val emitter: Watermark => Unit = if (watermarkDelay == 0) {
+    emitWithoutDelay
+  } else {
+    emitWithDelay
+  }
+
+  @throws[Exception]
+  override def processWatermark(mark: Watermark) {
+    if (timeServiceManager != null) timeServiceManager.advanceWatermark(mark)
+
+    emitter(mark)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
new file mode 100644
index 0000000..243a034
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+import org.junit.Test
+
+/**
+  * Tests [[KeyedCoProcessOperatorWithWatermarkDelay]].
+  */
+class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {
+
+  @Test
+  def testHoldingBackWatermarks(): Unit = {
+    val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, Integer, String, String](
+      new EmptyCoProcessFunction, 100)
+    val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, Integer, String, String](
+      operator,
+      new IntToStringKeySelector, new CoIdentityKeySelector[String],
+      BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.setup()
+    testHarness.open()
+    testHarness.processWatermark1(new Watermark(101))
+    testHarness.processWatermark2(new Watermark(202))
+    testHarness.processWatermark1(new Watermark(103))
+    testHarness.processWatermark2(new Watermark(204))
+
+    val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
+    expectedOutput.add(new Watermark(1))
+    expectedOutput.add(new Watermark(3))
+
+    TestHarnessUtil.assertOutputEquals(
+      "Output was not correct.",
+      expectedOutput,
+      testHarness.getOutput)
+
+    testHarness.close()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testDelayParameter(): Unit = {
+    new KeyedCoProcessOperatorWithWatermarkDelay[AnyRef, Integer, String, String](
+      new EmptyCoProcessFunction, -1)
+  }
+}
+
+private class EmptyCoProcessFunction extends CoProcessFunction[Integer, String, String] {
+  override def processElement1(
+    value: Integer,
+    ctx: CoProcessFunction[Integer, String, String]#Context,
+    out: Collector[String]): Unit = {
+    // do nothing
+  }
+
+  override def processElement2(
+    value: String,
+    ctx: CoProcessFunction[Integer, String, String]#Context,
+    out: Collector[String]): Unit = {
+    //do nothing
+  }
+}
+
+
+private class IntToStringKeySelector extends KeySelector[Integer, String] {
+  override def getKey(value: Integer): String = String.valueOf(value)
+}
+
+private class CoIdentityKeySelector[T] extends KeySelector[T, T] {
+  override def getKey(value: T): T = value
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala
new file mode 100644
index 0000000..d419453
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+import org.junit.Test
+
+/**
+  * Tests [[KeyedProcessOperatorWithWatermarkDelay]].
+  */
+class KeyedProcessOperatorWithWatermarkDelayTest extends TestLogger {
+
+  @Test
+  def testHoldingBackWatermarks(): Unit = {
+    val operator = new KeyedProcessOperatorWithWatermarkDelay[Integer, Integer, String](
+      new EmptyProcessFunction, 100)
+    val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, String](
+      operator, new IdentityKeySelector, BasicTypeInfo.INT_TYPE_INFO)
+
+    testHarness.setup()
+    testHarness.open()
+    testHarness.processWatermark(new Watermark(101))
+    testHarness.processWatermark(new Watermark(103))
+
+    val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
+    expectedOutput.add(new Watermark(1))
+    expectedOutput.add(new Watermark(3))
+
+    TestHarnessUtil.assertOutputEquals(
+      "Output was not correct.",
+      expectedOutput,
+      testHarness.getOutput)
+
+    testHarness.close()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testDelayParameter(): Unit = {
+    new KeyedProcessOperatorWithWatermarkDelay[Integer, Integer, String](
+      new EmptyProcessFunction, -1)
+  }
+}
+
+private class EmptyProcessFunction extends ProcessFunction[Integer, String] {
+  override def processElement(
+    value: Integer,
+    ctx: ProcessFunction[Integer, String]#Context,
+    out: Collector[String]): Unit = {
+    // do nothing
+  }
+}
+
+private class IdentityKeySelector[T] extends KeySelector[T, T] {
+  override def getKey(value: T): T = value
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a28fc30..fc043a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -160,7 +160,7 @@ public abstract class AbstractStreamOperator<OUT>
 
 	// ---------------- time handler ------------------
 
-	private transient InternalTimeServiceManager<?, ?> timeServiceManager;
+	protected transient InternalTimeServiceManager<?, ?> timeServiceManager;
 
 	// ---------------- two-input operator watermarks ------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 17af3aa..7d5cb91 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -44,7 +44,7 @@ import java.util.Map;
  * @param <N> The type of namespace used for the timers.
  */
 @Internal
-class InternalTimeServiceManager<K, N> {
+public class InternalTimeServiceManager<K, N> {
 
 	private final int totalKeyGroups;
 	private final KeyGroupsList localKeyGroupRange;