You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/08/29 20:11:09 UTC
[2/5] flink git commit: [FLINK-7245] [table] Add operators to hold
back watermarks with static delays.
[FLINK-7245] [table] Add operators to hold back watermarks with static delays.
This closes #4530.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68fdaa57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68fdaa57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68fdaa57
Branch: refs/heads/master
Commit: 68fdaa57e35b8ee30a262aad4d26926b18054c57
Parents: 1fc0b64
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Aug 9 20:54:16 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 29 22:10:17 2017 +0200
----------------------------------------------------------------------
...yedCoProcessOperatorWithWatermarkDelay.scala | 58 ++++++++++++
...KeyedProcessOperatorWithWatermarkDelay.scala | 59 +++++++++++++
...oProcessOperatorWithWatermarkDelayTest.scala | 93 ++++++++++++++++++++
...dProcessOperatorWithWatermarkDelayTest.scala | 78 ++++++++++++++++
.../api/operators/AbstractStreamOperator.java | 2 +-
.../operators/InternalTimeServiceManager.java | 2 +-
6 files changed, 290 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala
new file mode 100644
index 0000000..f25de25
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+ * A [[KeyedCoProcessOperator]] that supports holding back watermarks with a static delay.
+ */
+class KeyedCoProcessOperatorWithWatermarkDelay[KEY, IN1, IN2, OUT](
+ private val flatMapper: CoProcessFunction[IN1, IN2, OUT],
+ private val watermarkDelay: Long = 0L)
+ extends KeyedCoProcessOperator[KEY, IN1, IN2, OUT](flatMapper) {
+
+ /** emits watermark without delay */
+ def emitWithoutDelay(mark: Watermark): Unit = output.emitWatermark(mark)
+
+ /** emits watermark with delay */
+ def emitWithDelay(mark: Watermark): Unit = {
+ output.emitWatermark(new Watermark(mark.getTimestamp - watermarkDelay))
+ }
+
+ if (watermarkDelay < 0) {
+ throw new IllegalArgumentException("The watermark delay should be non-negative.")
+ }
+
+ // choose watermark emitter
+ val emitter: Watermark => Unit = if (watermarkDelay == 0) {
+ emitWithoutDelay
+ } else {
+ emitWithDelay
+ }
+
+ @throws[Exception]
+ override def processWatermark(mark: Watermark) {
+ if (timeServiceManager != null) timeServiceManager.advanceWatermark(mark)
+
+ emitter(mark)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
new file mode 100644
index 0000000..74b4773
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+ * A [[KeyedProcessOperator]] that supports holding back watermarks with a static delay.
+ */
+class KeyedProcessOperatorWithWatermarkDelay[KEY, IN, OUT](
+ private val function: ProcessFunction[IN, OUT],
+ private var watermarkDelay: Long = 0L)
+ extends KeyedProcessOperator[KEY, IN, OUT](function) {
+
+ /** emits watermark without delay */
+ def emitWithoutDelay(mark: Watermark): Unit = output.emitWatermark(mark)
+
+ /** emits watermark with delay */
+ def emitWithDelay(mark: Watermark): Unit = {
+ output.emitWatermark(new Watermark(mark.getTimestamp - watermarkDelay))
+ }
+
+ if (watermarkDelay < 0) {
+ throw new IllegalArgumentException("The watermark delay should be non-negative.")
+ }
+
+ // choose watermark emitter
+ val emitter: Watermark => Unit = if (watermarkDelay == 0) {
+ emitWithoutDelay
+ } else {
+ emitWithDelay
+ }
+
+ @throws[Exception]
+ override def processWatermark(mark: Watermark) {
+ if (timeServiceManager != null) timeServiceManager.advanceWatermark(mark)
+
+ emitter(mark)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
new file mode 100644
index 0000000..243a034
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+import org.junit.Test
+
+/**
+ * Tests [[KeyedCoProcessOperatorWithWatermarkDelay]].
+ */
+class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {
+
+ @Test
+ def testHoldingBackWatermarks(): Unit = {
+ val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, Integer, String, String](
+ new EmptyCoProcessFunction, 100)
+ val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, Integer, String, String](
+ operator,
+ new IntToStringKeySelector, new CoIdentityKeySelector[String],
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ testHarness.setup()
+ testHarness.open()
+ testHarness.processWatermark1(new Watermark(101))
+ testHarness.processWatermark2(new Watermark(202))
+ testHarness.processWatermark1(new Watermark(103))
+ testHarness.processWatermark2(new Watermark(204))
+
+ val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
+ expectedOutput.add(new Watermark(1))
+ expectedOutput.add(new Watermark(3))
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput)
+
+ testHarness.close()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testDelayParameter(): Unit = {
+ new KeyedCoProcessOperatorWithWatermarkDelay[AnyRef, Integer, String, String](
+ new EmptyCoProcessFunction, -1)
+ }
+}
+
+private class EmptyCoProcessFunction extends CoProcessFunction[Integer, String, String] {
+ override def processElement1(
+ value: Integer,
+ ctx: CoProcessFunction[Integer, String, String]#Context,
+ out: Collector[String]): Unit = {
+ // do nothing
+ }
+
+ override def processElement2(
+ value: String,
+ ctx: CoProcessFunction[Integer, String, String]#Context,
+ out: Collector[String]): Unit = {
+ //do nothing
+ }
+}
+
+
+private class IntToStringKeySelector extends KeySelector[Integer, String] {
+ override def getKey(value: Integer): String = String.valueOf(value)
+}
+
+private class CoIdentityKeySelector[T] extends KeySelector[T, T] {
+ override def getKey(value: T): T = value
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala
new file mode 100644
index 0000000..d419453
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+import org.junit.Test
+
+/**
+ * Tests [[KeyedProcessOperatorWithWatermarkDelay]].
+ */
+class KeyedProcessOperatorWithWatermarkDelayTest extends TestLogger {
+
+ @Test
+ def testHoldingBackWatermarks(): Unit = {
+ val operator = new KeyedProcessOperatorWithWatermarkDelay[Integer, Integer, String](
+ new EmptyProcessFunction, 100)
+ val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, String](
+ operator, new IdentityKeySelector, BasicTypeInfo.INT_TYPE_INFO)
+
+ testHarness.setup()
+ testHarness.open()
+ testHarness.processWatermark(new Watermark(101))
+ testHarness.processWatermark(new Watermark(103))
+
+ val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
+ expectedOutput.add(new Watermark(1))
+ expectedOutput.add(new Watermark(3))
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput)
+
+ testHarness.close()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testDelayParameter(): Unit = {
+ new KeyedProcessOperatorWithWatermarkDelay[Integer, Integer, String](
+ new EmptyProcessFunction, -1)
+ }
+}
+
+private class EmptyProcessFunction extends ProcessFunction[Integer, String] {
+ override def processElement(
+ value: Integer,
+ ctx: ProcessFunction[Integer, String]#Context,
+ out: Collector[String]): Unit = {
+ // do nothing
+ }
+}
+
+private class IdentityKeySelector[T] extends KeySelector[T, T] {
+ override def getKey(value: T): T = value
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a28fc30..fc043a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -160,7 +160,7 @@ public abstract class AbstractStreamOperator<OUT>
// ---------------- time handler ------------------
- private transient InternalTimeServiceManager<?, ?> timeServiceManager;
+ protected transient InternalTimeServiceManager<?, ?> timeServiceManager;
// ---------------- two-input operator watermarks ------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/68fdaa57/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 17af3aa..7d5cb91 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -44,7 +44,7 @@ import java.util.Map;
* @param <N> The type of namespace used for the timers.
*/
@Internal
-class InternalTimeServiceManager<K, N> {
+public class InternalTimeServiceManager<K, N> {
private final int totalKeyGroups;
private final KeyGroupsList localKeyGroupRange;