You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/01 10:16:25 UTC

[flink] branch master updated: [FLINK-13491][datastream] correctly support endInput in AsyncWaitOperator

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cf763d5  [FLINK-13491][datastream] correctly support endInput in AsyncWaitOperator
cf763d5 is described below

commit cf763d51a8941f48733c08e924c340573d537f3f
Author: Biao Liu <mm...@gmail.com>
AuthorDate: Thu Aug 1 18:16:14 2019 +0800

    [FLINK-13491][datastream] correctly support endInput in AsyncWaitOperator
    
    Before completing endInput operator has to make sure that all of the records were emitted.
---
 .../api/operators/async/AsyncWaitOperator.java     | 26 ++++++---
 .../api/operators/async/AsyncWaitOperatorTest.java | 68 ++++++++++++++++++++++
 .../util/OneInputStreamOperatorTestHarness.java    |  9 +++
 3 files changed, 95 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index f875775..3ae6abd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -76,7 +77,7 @@ import java.util.concurrent.TimeUnit;
 @Internal
 public class AsyncWaitOperator<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT>, OperatorActions {
+		implements OneInputStreamOperator<IN, OUT>, OperatorActions, BoundedOneInput {
 	private static final long serialVersionUID = 1L;
 
 	private static final String STATE_NAME = "_async_wait_operator_state_";
@@ -273,15 +274,14 @@ public class AsyncWaitOperator<IN, OUT>
 	}
 
 	@Override
+	public void endInput() throws Exception {
+		waitInFlightInputsFinished();
+	}
+
+	@Override
 	public void close() throws Exception {
 		try {
-			assert(Thread.holdsLock(checkpointingLock));
-
-			while (!queue.isEmpty()) {
-				// wait for the emitter thread to output the remaining elements
-				// for that he needs the checkpointing lock and thus we have to free it
-				checkpointingLock.wait();
-			}
+			waitInFlightInputsFinished();
 		}
 		finally {
 			Exception exception = null;
@@ -409,6 +409,16 @@ public class AsyncWaitOperator<IN, OUT>
 		pendingStreamElementQueueEntry = null;
 	}
 
+	private void waitInFlightInputsFinished() throws InterruptedException {
+		assert(Thread.holdsLock(checkpointingLock));
+
+		while (!queue.isEmpty()) {
+			// wait for the emitter thread to output the remaining elements
+			// for that he needs the checkpointing lock and thus we have to free it
+			checkpointingLock.wait();
+		}
+	}
+
 	@Override
 	public void failOperator(Throwable throwable) {
 		getContainingTask().getEnvironment().failExternally(throwable);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 05df362..159f2d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -223,6 +223,30 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	}
 
 	/**
+	 * AsyncFunction supports a specific delay(ms) before async invocation.
+	 */
+	private static class DelayedAsyncFunction extends MyAsyncFunction {
+
+		private final long delayed;
+
+		public DelayedAsyncFunction(long delayed) {
+			this.delayed = delayed;
+		}
+
+		@Override
+		public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
+			executorService.submit(() -> {
+				try {
+					Thread.sleep(delayed);
+				} catch (InterruptedException e) {
+					resultFuture.completeExceptionally(e);
+				}
+				resultFuture.complete(Collections.singletonList(input * 2));
+			});
+		}
+	}
+
+	/**
 	 * A special {@link LazyAsyncFunction} for timeout handling.
 	 * Complete the result future with 3 times the input when the timeout occurred.
 	 */
@@ -1176,4 +1200,48 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		return in.transform("async wait operator", outTypeInfo, operator);
 	}
+
+	/**
+	 * Delay a while before async invocation to check whether end input waits for all elements finished or not.
+	 */
+	@Test
+	public void testEndInput() throws Exception {
+		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+			new DelayedAsyncFunction(10),
+			-1,
+			2,
+			AsyncDataStream.OutputMode.ORDERED);
+
+		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
+
+		final long initialTime = 0L;
+		final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
+		expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
+
+		testHarness.open();
+
+		try {
+			synchronized (testHarness.getCheckpointLock()) {
+				testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+				testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+				testHarness.processWatermark(new Watermark(initialTime + 2));
+				testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
+			}
+
+			// wait until all async collectors in the buffer have been emitted out.
+			synchronized (testHarness.getCheckpointLock()) {
+				testHarness.endInput();
+			}
+
+			TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
+		} finally {
+			synchronized (testHarness.getCheckpointLock()) {
+				testHarness.close();
+			}
+		}
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 0155198..c8edea6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -130,4 +131,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 	public long getCurrentWatermark() {
 		return currentWatermark;
 	}
+
+	public void endInput() throws Exception {
+		if (oneInputOperator instanceof BoundedOneInput) {
+			((BoundedOneInput) oneInputOperator).endInput();
+		} else {
+			throw new UnsupportedOperationException("The operator is not BoundedOneInput");
+		}
+	}
 }