You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:17 UTC

[12/16] flink git commit: [FLINK-6435] [async] React to exceptionally completed StreamElementQueueEntry

[FLINK-6435] [async] React to exceptionally completed StreamElementQueueEntry

The AsyncWaitOperator should not only react to orderly completed
StreamElementQueueEntries but also to those completed with a user exception
or those which timed out.

This PR fixes the problem by calling the onComplete function passed to
StreamElementQueueEntry#onComplete also in the exceptional case.

This closes #3814.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93758082
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93758082
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93758082

Branch: refs/heads/master
Commit: 93758082273618d9fdbb3a9b3ed916a4b637760f
Parents: ddd6a99
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 3 14:40:46 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:08:09 2017 +0200

----------------------------------------------------------------------
 .../async/queue/StreamElementQueueEntry.java    |   9 +-
 .../operators/async/AsyncWaitOperatorTest.java  | 196 +++++++++++++++----
 2 files changed, 161 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93758082/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
index 66872df..4a50201 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators.async.queue;
 import java.util.concurrent.Executor;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.util.Preconditions;
@@ -65,10 +66,14 @@ public abstract class StreamElementQueueEntry<T> implements AsyncResult {
 			Executor executor) {
 		final StreamElementQueueEntry<T> thisReference = this;
 
-		getFuture().thenAcceptAsync(new AcceptFunction<T>() {
+		getFuture().handleAsync(new BiFunction<T, Throwable, Void>() {
 			@Override
-			public void accept(T value) {
+			public Void apply(T t, Throwable throwable) {
+				// call the complete function for normal completion as well as exceptional completion
+				// see FLINK-6435
 				completeFunction.accept(thisReference);
+
+				return null;
 			}
 		}, executor);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/93758082/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index c4867ff..1d83229 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -71,6 +71,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.annotation.Nonnull;
 import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Comparator;
@@ -90,13 +91,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 /**
  * Tests for {@link AsyncWaitOperator}. These test that:
@@ -173,7 +168,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	}
 
 	/**
-	 * A special {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} without issuing
+	 * A special {@link AsyncFunction} without issuing
 	 * {@link AsyncCollector#collect} until the latch counts to zero.
 	 * This function is used in the testStateSnapshotAndRestore, ensuring
 	 * that {@link StreamElementQueueEntry} can stay
@@ -637,20 +632,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			2,
 			AsyncDataStream.OutputMode.ORDERED);
 
-		final Environment mockEnvironment = mock(Environment.class);
-
-		final Configuration taskConfiguration = new Configuration();
-		final ExecutionConfig executionConfig = new ExecutionConfig();
-		final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
-		final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
-		final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
-
-		when(mockEnvironment.getTaskConfiguration()).thenReturn(taskConfiguration);
-		when(mockEnvironment.getExecutionConfig()).thenReturn(executionConfig);
-		when(mockEnvironment.getMetricGroup()).thenReturn(metricGroup);
-		when(mockEnvironment.getTaskManagerInfo()).thenReturn(taskManagerRuntimeInfo);
-		when(mockEnvironment.getTaskInfo()).thenReturn(taskInfo);
-		when(mockEnvironment.getUserClassLoader()).thenReturn(AsyncWaitOperatorTest.class.getClassLoader());
+		final Environment mockEnvironment = createMockEnvironment();
 
 		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 			new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE, mockEnvironment);
@@ -696,6 +678,25 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException);
 	}
 
+	@Nonnull
+	private Environment createMockEnvironment() {
+		final Environment mockEnvironment = mock(Environment.class);
+
+		final Configuration taskConfiguration = new Configuration();
+		final ExecutionConfig executionConfig = new ExecutionConfig();
+		final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
+		final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
+		final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
+
+		when(mockEnvironment.getTaskConfiguration()).thenReturn(taskConfiguration);
+		when(mockEnvironment.getExecutionConfig()).thenReturn(executionConfig);
+		when(mockEnvironment.getMetricGroup()).thenReturn(metricGroup);
+		when(mockEnvironment.getTaskManagerInfo()).thenReturn(taskManagerRuntimeInfo);
+		when(mockEnvironment.getTaskInfo()).thenReturn(taskInfo);
+		when(mockEnvironment.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
+		return mockEnvironment;
+	}
+
 	/**
 	 * Test case for FLINK-5638: Tests that the async wait operator can be closed even if the
 	 * emitter is currently waiting on the checkpoint lock (e.g. in the case of two chained async
@@ -710,16 +711,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		ArgumentCaptor<Throwable> failureReason = ArgumentCaptor.forClass(Throwable.class);
 
-		Environment environment = mock(Environment.class);
-		when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
-		when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
-		when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
-		when(environment.getTaskInfo()).thenReturn(new TaskInfo(
-			"testTask",
-			1,
-			0,
-			1,
-			0));
+		Environment environment = createMockEnvironment();
 		doNothing().when(environment).failExternally(failureReason.capture());
 
 		StreamTask<?, ?> containingTask = mock(StreamTask.class);
@@ -827,16 +819,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		final long timeout = 100000L;
 		final long timestamp = 1L;
 
-		Environment environment = mock(Environment.class);
-		when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
-		when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
-		when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
-		when(environment.getTaskInfo()).thenReturn(new TaskInfo(
-			"testTask",
-			1,
-			0,
-			1,
-			0));
+		Environment environment = createMockEnvironment();
 
 		ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
 
@@ -893,4 +876,133 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		// check that we have cancelled our registered timeout
 		verify(scheduledFuture).cancel(eq(true));
 	}
+
+	/**
+	 * FLINK-6435
+	 *
+	 * Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until
+	 * another StreamElementQueueEntry is properly completed before it is collected.
+	 */
+	@Test(timeout = 2000)
+	public void testOrderedWaitUserExceptionHandling() throws Exception {
+		testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
+	}
+
+	/**
+	 * FLINK-6435
+	 *
+	 * Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until
+	 * another StreamElementQueueEntry is properly completed before it is collected.
+	 */
+	@Test(timeout = 2000)
+	public void testUnorderedWaitUserExceptionHandling() throws Exception {
+		testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
+	}
+
+	private void testUserExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
+		UserExceptionAsyncFunction asyncWaitFunction = new UserExceptionAsyncFunction();
+		long timeout = 2000L;
+
+		AsyncWaitOperator<Integer, Integer> asyncWaitOperator = new AsyncWaitOperator<>(
+			asyncWaitFunction,
+			TIMEOUT,
+			2,
+			outputMode);
+
+		final Environment mockEnvironment = createMockEnvironment();
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> harness = new OneInputStreamOperatorTestHarness<>(
+			asyncWaitOperator,
+			IntSerializer.INSTANCE,
+			mockEnvironment);
+
+		harness.open();
+
+		synchronized (harness.getCheckpointLock()) {
+			harness.processElement(1, 1L);
+		}
+
+		verify(harness.getEnvironment(), timeout(timeout)).failExternally(any(Exception.class));
+
+		synchronized (harness.getCheckpointLock()) {
+			harness.close();
+		}
+	}
+
+	/**
+	 * AsyncFunction which completes the result with an {@link Exception}.
+	 */
+	private static class UserExceptionAsyncFunction implements AsyncFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 6326568632967110990L;
+
+		@Override
+		public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+			collector.collect(new Exception("Test exception"));
+		}
+	}
+
+	/**
+	 * FLINK-6435
+	 *
+	 * Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that
+	 * a StreamElementQueueEntry is completed in case of a timeout exception.
+	 */
+	@Test
+	public void testOrderedWaitTimeoutHandling() throws Exception {
+		testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
+	}
+
+	/**
+	 * FLINK-6435
+	 *
+	 * Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that
+	 * a StreamElementQueueEntry is completed in case of a timeout exception.
+	 */
+	@Test
+	public void testUnorderedWaitTimeoutHandling() throws Exception {
+		testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
+	}
+
+	private void testTimeoutExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
+		AsyncFunction<Integer, Integer> asyncFunction = new NoOpAsyncFunction<>();
+		long timeout = 10L; // 1 milli second
+
+		AsyncWaitOperator<Integer, Integer> asyncWaitOperator = new AsyncWaitOperator<>(
+			asyncFunction,
+			timeout,
+			2,
+			outputMode);
+
+		final Environment mockenvironment = createMockEnvironment();
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> harness = new OneInputStreamOperatorTestHarness<>(
+			asyncWaitOperator,
+			IntSerializer.INSTANCE,
+			mockenvironment);
+
+		harness.open();
+
+		synchronized (harness.getCheckpointLock()) {
+			harness.processElement(1, 1L);
+		}
+
+		harness.setProcessingTime(10L);
+
+		verify(harness.getEnvironment(), timeout(100L * timeout)).failExternally(any(Exception.class));
+
+		synchronized (harness.getCheckpointLock()) {
+			harness.close();
+		}
+	}
+
+	private static class NoOpAsyncFunction<IN, OUT> implements AsyncFunction<IN, OUT> {
+		private static final long serialVersionUID = -3060481953330480694L;
+
+		@Override
+		public void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception {
+			// no op
+		}
+	}
+
 }