You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:17 UTC
[12/16] flink git commit: [FLINK-6435] [async] React to exceptionally
completed StreamElementQueueEntry
[FLINK-6435] [async] React to exceptionally completed StreamElementQueueEntry
The AsyncWaitOperator should not only react to orderly completed
StreamElementQueueEntries but also to those completed with a user exception
or those which timed out.
This PR fixes the problem by calling the onComplete function passed to
StreamElementQueueEntry#onComplete also in the exceptional case.
This closes #3814.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93758082
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93758082
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93758082
Branch: refs/heads/master
Commit: 93758082273618d9fdbb3a9b3ed916a4b637760f
Parents: ddd6a99
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 3 14:40:46 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:08:09 2017 +0200
----------------------------------------------------------------------
.../async/queue/StreamElementQueueEntry.java | 9 +-
.../operators/async/AsyncWaitOperatorTest.java | 196 +++++++++++++++----
2 files changed, 161 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/93758082/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
index 66872df..4a50201 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators.async.queue;
import java.util.concurrent.Executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.util.Preconditions;
@@ -65,10 +66,14 @@ public abstract class StreamElementQueueEntry<T> implements AsyncResult {
Executor executor) {
final StreamElementQueueEntry<T> thisReference = this;
- getFuture().thenAcceptAsync(new AcceptFunction<T>() {
+ getFuture().handleAsync(new BiFunction<T, Throwable, Void>() {
@Override
- public void accept(T value) {
+ public Void apply(T t, Throwable throwable) {
+ // call the complete function for normal completion as well as exceptional completion
+ // see FLINK-6435
completeFunction.accept(thisReference);
+
+ return null;
}
}, executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/93758082/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index c4867ff..1d83229 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -71,6 +71,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import javax.annotation.Nonnull;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Comparator;
@@ -90,13 +91,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
/**
* Tests for {@link AsyncWaitOperator}. These test that:
@@ -173,7 +168,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
/**
- * A special {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} without issuing
+ * A special {@link AsyncFunction} without issuing
* {@link AsyncCollector#collect} until the latch counts to zero.
* This function is used in the testStateSnapshotAndRestore, ensuring
* that {@link StreamElementQueueEntry} can stay
@@ -637,20 +632,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
2,
AsyncDataStream.OutputMode.ORDERED);
- final Environment mockEnvironment = mock(Environment.class);
-
- final Configuration taskConfiguration = new Configuration();
- final ExecutionConfig executionConfig = new ExecutionConfig();
- final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
- final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
- final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
-
- when(mockEnvironment.getTaskConfiguration()).thenReturn(taskConfiguration);
- when(mockEnvironment.getExecutionConfig()).thenReturn(executionConfig);
- when(mockEnvironment.getMetricGroup()).thenReturn(metricGroup);
- when(mockEnvironment.getTaskManagerInfo()).thenReturn(taskManagerRuntimeInfo);
- when(mockEnvironment.getTaskInfo()).thenReturn(taskInfo);
- when(mockEnvironment.getUserClassLoader()).thenReturn(AsyncWaitOperatorTest.class.getClassLoader());
+ final Environment mockEnvironment = createMockEnvironment();
final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE, mockEnvironment);
@@ -696,6 +678,25 @@ public class AsyncWaitOperatorTest extends TestLogger {
Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException);
}
+ @Nonnull
+ private Environment createMockEnvironment() {
+ final Environment mockEnvironment = mock(Environment.class);
+
+ final Configuration taskConfiguration = new Configuration();
+ final ExecutionConfig executionConfig = new ExecutionConfig();
+ final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
+ final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
+ final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
+
+ when(mockEnvironment.getTaskConfiguration()).thenReturn(taskConfiguration);
+ when(mockEnvironment.getExecutionConfig()).thenReturn(executionConfig);
+ when(mockEnvironment.getMetricGroup()).thenReturn(metricGroup);
+ when(mockEnvironment.getTaskManagerInfo()).thenReturn(taskManagerRuntimeInfo);
+ when(mockEnvironment.getTaskInfo()).thenReturn(taskInfo);
+ when(mockEnvironment.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
+ return mockEnvironment;
+ }
+
/**
* Test case for FLINK-5638: Tests that the async wait operator can be closed even if the
* emitter is currently waiting on the checkpoint lock (e.g. in the case of two chained async
@@ -710,16 +711,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
ArgumentCaptor<Throwable> failureReason = ArgumentCaptor.forClass(Throwable.class);
- Environment environment = mock(Environment.class);
- when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
- when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
- when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
- when(environment.getTaskInfo()).thenReturn(new TaskInfo(
- "testTask",
- 1,
- 0,
- 1,
- 0));
+ Environment environment = createMockEnvironment();
doNothing().when(environment).failExternally(failureReason.capture());
StreamTask<?, ?> containingTask = mock(StreamTask.class);
@@ -827,16 +819,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
final long timeout = 100000L;
final long timestamp = 1L;
- Environment environment = mock(Environment.class);
- when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
- when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
- when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
- when(environment.getTaskInfo()).thenReturn(new TaskInfo(
- "testTask",
- 1,
- 0,
- 1,
- 0));
+ Environment environment = createMockEnvironment();
ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
@@ -893,4 +876,133 @@ public class AsyncWaitOperatorTest extends TestLogger {
// check that we have cancelled our registered timeout
verify(scheduledFuture).cancel(eq(true));
}
+
+ /**
+ * FLINK-6435
+ *
+ * Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until
+ * another StreamElementQueueEntry is properly completed before it is collected.
+ */
+ @Test(timeout = 2000)
+ public void testOrderedWaitUserExceptionHandling() throws Exception {
+ testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
+ }
+
+ /**
+ * FLINK-6435
+ *
+ * Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until
+ * another StreamElementQueueEntry is properly completed before it is collected.
+ */
+ @Test(timeout = 2000)
+ public void testUnorderedWaitUserExceptionHandling() throws Exception {
+ testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
+ }
+
+ private void testUserExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
+ UserExceptionAsyncFunction asyncWaitFunction = new UserExceptionAsyncFunction();
+ long timeout = 2000L;
+
+ AsyncWaitOperator<Integer, Integer> asyncWaitOperator = new AsyncWaitOperator<>(
+ asyncWaitFunction,
+ TIMEOUT,
+ 2,
+ outputMode);
+
+ final Environment mockEnvironment = createMockEnvironment();
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> harness = new OneInputStreamOperatorTestHarness<>(
+ asyncWaitOperator,
+ IntSerializer.INSTANCE,
+ mockEnvironment);
+
+ harness.open();
+
+ synchronized (harness.getCheckpointLock()) {
+ harness.processElement(1, 1L);
+ }
+
+ verify(harness.getEnvironment(), timeout(timeout)).failExternally(any(Exception.class));
+
+ synchronized (harness.getCheckpointLock()) {
+ harness.close();
+ }
+ }
+
+ /**
+ * AsyncFunction which completes the result with an {@link Exception}.
+ */
+ private static class UserExceptionAsyncFunction implements AsyncFunction<Integer, Integer> {
+
+ private static final long serialVersionUID = 6326568632967110990L;
+
+ @Override
+ public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+ collector.collect(new Exception("Test exception"));
+ }
+ }
+
+ /**
+ * FLINK-6435
+ *
+ * Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that
+ * a StreamElementQueueEntry is completed in case of a timeout exception.
+ */
+ @Test
+ public void testOrderedWaitTimeoutHandling() throws Exception {
+ testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
+ }
+
+ /**
+ * FLINK-6435
+ *
+ * Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that
+ * a StreamElementQueueEntry is completed in case of a timeout exception.
+ */
+ @Test
+ public void testUnorderedWaitTimeoutHandling() throws Exception {
+ testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
+ }
+
+ private void testTimeoutExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
+ AsyncFunction<Integer, Integer> asyncFunction = new NoOpAsyncFunction<>();
+ long timeout = 10L; // 1 milli second
+
+ AsyncWaitOperator<Integer, Integer> asyncWaitOperator = new AsyncWaitOperator<>(
+ asyncFunction,
+ timeout,
+ 2,
+ outputMode);
+
+ final Environment mockenvironment = createMockEnvironment();
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> harness = new OneInputStreamOperatorTestHarness<>(
+ asyncWaitOperator,
+ IntSerializer.INSTANCE,
+ mockenvironment);
+
+ harness.open();
+
+ synchronized (harness.getCheckpointLock()) {
+ harness.processElement(1, 1L);
+ }
+
+ harness.setProcessingTime(10L);
+
+ verify(harness.getEnvironment(), timeout(100L * timeout)).failExternally(any(Exception.class));
+
+ synchronized (harness.getCheckpointLock()) {
+ harness.close();
+ }
+ }
+
+ private static class NoOpAsyncFunction<IN, OUT> implements AsyncFunction<IN, OUT> {
+ private static final long serialVersionUID = -3060481953330480694L;
+
+ @Override
+ public void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception {
+ // no op
+ }
+ }
+
}