You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/01 10:17:07 UTC
[flink] branch release-1.9 updated: [FLINK-13491][datastream]
correctly support endInput in AsyncWaitOperator
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new be11caa [FLINK-13491][datastream] correctly support endInput in AsyncWaitOperator
be11caa is described below
commit be11caa3728ca5fd87d53e5beaa26e1e958f4efa
Author: Biao Liu <mm...@gmail.com>
AuthorDate: Thu Aug 1 18:16:14 2019 +0800
[FLINK-13491][datastream] correctly support endInput in AsyncWaitOperator
Before completing endInput operator has to make sure that all of the records were emitted.
---
.../api/operators/async/AsyncWaitOperator.java | 26 ++++++---
.../api/operators/async/AsyncWaitOperatorTest.java | 68 ++++++++++++++++++++++
.../util/OneInputStreamOperatorTestHarness.java | 9 +++
3 files changed, 95 insertions(+), 8 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index f875775..3ae6abd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
@@ -76,7 +77,7 @@ import java.util.concurrent.TimeUnit;
@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT>, OperatorActions {
+ implements OneInputStreamOperator<IN, OUT>, OperatorActions, BoundedOneInput {
private static final long serialVersionUID = 1L;
private static final String STATE_NAME = "_async_wait_operator_state_";
@@ -273,15 +274,14 @@ public class AsyncWaitOperator<IN, OUT>
}
@Override
+ public void endInput() throws Exception {
+ waitInFlightInputsFinished();
+ }
+
+ @Override
public void close() throws Exception {
try {
- assert(Thread.holdsLock(checkpointingLock));
-
- while (!queue.isEmpty()) {
- // wait for the emitter thread to output the remaining elements
- // for that he needs the checkpointing lock and thus we have to free it
- checkpointingLock.wait();
- }
+ waitInFlightInputsFinished();
}
finally {
Exception exception = null;
@@ -409,6 +409,16 @@ public class AsyncWaitOperator<IN, OUT>
pendingStreamElementQueueEntry = null;
}
+ private void waitInFlightInputsFinished() throws InterruptedException {
+ assert(Thread.holdsLock(checkpointingLock));
+
+ while (!queue.isEmpty()) {
+ // wait for the emitter thread to output the remaining elements
+ // for that he needs the checkpointing lock and thus we have to free it
+ checkpointingLock.wait();
+ }
+ }
+
@Override
public void failOperator(Throwable throwable) {
getContainingTask().getEnvironment().failExternally(throwable);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 05df362..159f2d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -223,6 +223,30 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
/**
+ * AsyncFunction supports a specific delay(ms) before async invocation.
+ */
+ private static class DelayedAsyncFunction extends MyAsyncFunction {
+
+ private final long delayed;
+
+ public DelayedAsyncFunction(long delayed) {
+ this.delayed = delayed;
+ }
+
+ @Override
+ public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
+ executorService.submit(() -> {
+ try {
+ Thread.sleep(delayed);
+ } catch (InterruptedException e) {
+ resultFuture.completeExceptionally(e);
+ }
+ resultFuture.complete(Collections.singletonList(input * 2));
+ });
+ }
+ }
+
+ /**
* A special {@link LazyAsyncFunction} for timeout handling.
* Complete the result future with 3 times the input when the timeout occurred.
*/
@@ -1176,4 +1200,48 @@ public class AsyncWaitOperatorTest extends TestLogger {
return in.transform("async wait operator", outTypeInfo, operator);
}
+
+ /**
+ * Delay a while before async invocation to check whether end input waits for all elements finished or not.
+ */
+ @Test
+ public void testEndInput() throws Exception {
+ final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+ new DelayedAsyncFunction(10),
+ -1,
+ 2,
+ AsyncDataStream.OutputMode.ORDERED);
+
+ final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
+
+ final long initialTime = 0L;
+ final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+ expectedOutput.add(new Watermark(initialTime + 2));
+ expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
+
+ testHarness.open();
+
+ try {
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+ testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+ testHarness.processWatermark(new Watermark(initialTime + 2));
+ testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
+ }
+
+ // wait until all async collectors in the buffer have been emitted out.
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.endInput();
+ }
+
+ TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
+ } finally {
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.close();
+ }
+ }
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 0155198..c8edea6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -130,4 +131,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
public long getCurrentWatermark() {
return currentWatermark;
}
+
+ public void endInput() throws Exception {
+ if (oneInputOperator instanceof BoundedOneInput) {
+ ((BoundedOneInput) oneInputOperator).endInput();
+ } else {
+ throw new UnsupportedOperationException("The operator is not BoundedOneInput");
+ }
+ }
}