You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/17 08:19:53 UTC

[GitHub] pnowojski closed pull request #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction

pnowojski closed pull request #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction
URL: https://github.com/apache/flink/pull/6687
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
index c2c130ef58d..faf8fc7943a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -84,6 +85,15 @@
 	 */
 	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
 
+	/**
+	 * Called when watermark has advanced.
+	 *
+	 * @param mark The {@link Watermark} that triggered this call
+	 * @param out The collector to emit resulting elements to
+	 */
+	public void processWatermark(Watermark mark, Collector<O> out) throws Exception {
+	}
+
 	/**
 	 * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
 	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index 20c10840c2c..39a9a7d2cd5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -98,6 +99,39 @@
 	 */
 	public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
 
+	/**
+	 * Called when combined watermark of both inputs has advanced.
+	 *
+	 * @param mark The {@link Watermark} that triggered this call
+	 * @param out The collector to emit resulting elements to
+	 */
+	public void processWatermark(Watermark mark, Collector<OUT> out) throws Exception {
+	}
+
+	/**
+	 * Called when watermark of the first input has advanced. If this update will trigger an update
+	 * of the combined watermark, this call will be followed by {@link #processWatermark(Watermark, Collector)}
+	 * call.
+	 *
+	 * @param mark The {@link Watermark} that triggered this call
+	 * @param out The collector to emit resulting elements to. Results emitted will have a timestamp
+	 *            set to the value before advancing combined watermark.
+	 */
+	public void processWatermark1(Watermark mark, Collector<OUT> out) throws Exception {
+	}
+
+	/**
+	 * Called when watermark of the second input has advanced. If this update will trigger an update
+	 * of the combined watermark, this call will be followed by {@link #processWatermark(Watermark, Collector)}
+	 * call.
+	 *
+	 * @param mark The {@link Watermark} that triggered this call
+	 * @param out The collector to emit resulting elements to. Results emitted will have a timestamp
+	 *            set to the value before advancing combined watermark.
+	 */
+	public void processWatermark2(Watermark mark, Collector<OUT> out) throws Exception {
+	}
+
 	/**
 	 * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/
 	 * {@link #processElement2(Object, Context, Collector)}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 25c93a60b3d..bd18c40378b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -69,6 +69,8 @@ public void processElement(StreamRecord<IN> element) throws Exception {
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
+		collector.setAbsoluteTimestamp(mark.getTimestamp());
+		userFunction.processWatermark(mark, collector);
 		super.processWatermark(mark);
 		this.currentWatermark = mark.getTimestamp();
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index f6f2846bc25..fda22856a67 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -78,8 +78,25 @@ public void processElement2(StreamRecord<IN2> element) throws Exception {
 		context.element = null;
 	}
 
+	@Override
+	public void processWatermark1(Watermark mark) throws Exception {
+		collector.setAbsoluteTimestamp(currentWatermark);
+		userFunction.processWatermark1(mark, collector);
+		super.processWatermark1(mark);
+	}
+
+	@Override
+	public void processWatermark2(Watermark mark) throws Exception {
+		collector.setAbsoluteTimestamp(currentWatermark);
+		userFunction.processWatermark2(mark, collector);
+		super.processWatermark2(mark);
+	}
+
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
+		collector.setAbsoluteTimestamp(mark.getTimestamp());
+		userFunction.processWatermark(mark, collector);
+
 		super.processWatermark(mark);
 		currentWatermark = mark.getTimestamp();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index 2d9d4d39496..051c900bc78 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -42,6 +42,28 @@
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
+	@Test
+	public void testProcessWatermark() throws Exception {
+		try (OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				 new OneInputStreamOperatorTestHarness<>(new ProcessOperator<>(new OutputProcessWatermarkArguments()))) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+			testHarness.processWatermark(new Watermark(42));
+			expectedOutput.add(new StreamRecord<>("W_42", 42L));
+			expectedOutput.add(new Watermark(42L));
+
+			testHarness.processWatermark(new Watermark(44));
+			expectedOutput.add(new StreamRecord<>("W_44", 44L));
+			expectedOutput.add(new Watermark(44L));
+
+			TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+		}
+	}
+
 	@Test
 	public void testTimestampAndWatermarkQuerying() throws Exception {
 
@@ -209,4 +231,15 @@ public void onTimer(
 				Collector<String> out) throws Exception {
 		}
 	}
+
+	private static class OutputProcessWatermarkArguments<IN> extends ProcessFunction<IN, String> {
+		@Override
+		public void processElement(IN value, Context ctx, Collector<String> out) throws Exception {
+		}
+
+		@Override
+		public void processWatermark(Watermark mark, Collector<String> out) throws Exception {
+			out.collect("W_" + mark.getTimestamp());
+		}
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index beb5bf554c0..47e827ec0ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -35,6 +35,29 @@
  */
 public class CoProcessOperatorTest extends TestLogger {
 
+	@Test
+	public void testProcessWatermark() throws Exception {
+
+		try (TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+			new TwoInputStreamOperatorTestHarness<>(new CoProcessOperator<>(new OutputProcessWatermarkArguments()))) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+			testHarness.processWatermark1(new Watermark(42));
+			expectedOutput.add(new StreamRecord<>("W1_42", Long.MIN_VALUE));
+
+			testHarness.processWatermark2(new Watermark(44));
+			expectedOutput.add(new StreamRecord<>("W2_44", Long.MIN_VALUE));
+			expectedOutput.add(new StreamRecord<>("W_42", 42L));
+			expectedOutput.add(new Watermark(42L));
+
+			TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+		}
+	}
+
 	@Test
 	public void testTimestampAndWatermarkQuerying() throws Exception {
 
@@ -138,4 +161,29 @@ public void onTimer(
 				Collector<String> out) throws Exception {
 		}
 	}
+
+	private static class OutputProcessWatermarkArguments<IN1, IN2> extends CoProcessFunction<IN1, IN2, String> {
+		@Override
+		public void processElement1(IN1 value, Context ctx, Collector<String> out) throws Exception {
+		}
+
+		@Override
+		public void processElement2(IN2 value, Context ctx, Collector<String> out) throws Exception {
+		}
+
+		@Override
+		public void processWatermark(Watermark mark, Collector<String> out) {
+			out.collect("W_" + mark.getTimestamp());
+		}
+
+		@Override
+		public void processWatermark1(Watermark mark, Collector<String> out) {
+			out.collect("W1_" + mark.getTimestamp());
+		}
+
+		@Override
+		public void processWatermark2(Watermark mark, Collector<String> out) {
+			out.collect("W2_" + mark.getTimestamp());
+		}
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services