You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/25 15:16:51 UTC

flink git commit: [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators

Repository: flink
Updated Branches:
  refs/heads/master cf9f4c77c -> a811545ec


[FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators

This PR addresses the problem by changing the Emitter's behaviour to first output the
element before removing it from the StreamElementQueue. That way the close method waits
until also the Emitter has outputted the last completed element. Additionally, the
stopResources method now frees the checkpoint lock in order to let the emitter thread
react to the interrupt signal.

This closes #3209.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a811545e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a811545e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a811545e

Branch: refs/heads/master
Commit: a811545ecae762d3d2fd34e5d554c010ccd8b539
Parents: cf9f4c7
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 25 14:11:48 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 25 16:16:30 2017 +0100

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java  |  14 +-
 .../streaming/api/operators/async/Emitter.java  |  28 ++--
 .../queue/UnorderedStreamElementQueue.java      |   2 +-
 .../operators/async/AsyncWaitOperatorTest.java  | 131 +++++++++++++++++++
 4 files changed, 158 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a811545e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index f43f8b9..6793620 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -89,7 +89,7 @@ public class AsyncWaitOperator<IN, OUT>
 	/** Timeout for the async collectors */
 	private final long timeout;
 
-	private transient Object checkpointingLock;
+	protected transient Object checkpointingLock;
 
 	/** {@link TypeSerializer} for inputs while making snapshots. */
 	private transient StreamElementSerializer<IN> inStreamElementSerializer;
@@ -189,7 +189,7 @@ public class AsyncWaitOperator<IN, OUT>
 		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
 
 		// start the emitter thread
-		this.emitterThread = new Thread(emitter);
+		this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
 		emitterThread.setDaemon(true);
 		emitterThread.start();
 
@@ -356,6 +356,16 @@ public class AsyncWaitOperator<IN, OUT>
 				Thread.currentThread().interrupt();
 			}
 
+			/**
+			 * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
+			 * that the emitter thread can complete/react to the interrupt signal.
+			 */
+			if (Thread.holdsLock(checkpointingLock)) {
+				while (emitterThread.isAlive()) {
+					checkpointingLock.wait(100L);
+				}
+			}
+
 			emitterThread.join();
 		} else {
 			executor.shutdownNow();

http://git-wip-us.apache.org/repos/asf/flink/blob/a811545e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
index a07abe1..39be544 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -89,7 +89,7 @@ public class Emitter<OUT> implements Runnable {
 			} else {
 				// Thread got interrupted which means that it should shut down
 				LOG.debug("Emitter thread got interrupted. This indicates that the emitter should " +
-					"shut down.");
+					"shut down.", e);
 			}
 		} catch (Throwable t) {
 			operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
@@ -100,6 +100,11 @@ public class Emitter<OUT> implements Runnable {
 	private void output(AsyncResult asyncResult) throws InterruptedException {
 		if (asyncResult.isWatermark()) {
 			synchronized (checkpointLock) {
+				AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
+
+				LOG.debug("Output async watermark.");
+				output.emitWatermark(asyncWatermarkResult.getWatermark());
+
 				// remove the peeked element from the async collector buffer so that it is no longer
 				// checkpointed
 				streamElementQueue.poll();
@@ -107,11 +112,6 @@ public class Emitter<OUT> implements Runnable {
 				// notify the main thread that there is again space left in the async collector
 				// buffer
 				checkpointLock.notifyAll();
-
-				AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
-
-				LOG.debug("Output async watermark.");
-				output.emitWatermark(asyncWatermarkResult.getWatermark());
 			}
 		} else {
 			AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
@@ -123,14 +123,6 @@ public class Emitter<OUT> implements Runnable {
 			}
 
 			synchronized (checkpointLock) {
-				// remove the peeked element from the async collector buffer so that it is no longer
-				// checkpointed
-				streamElementQueue.poll();
-
-				// notify the main thread that there is again space left in the async collector
-				// buffer
-				checkpointLock.notifyAll();
-
 				LOG.debug("Output async stream element collection result.");
 
 				try {
@@ -146,6 +138,14 @@ public class Emitter<OUT> implements Runnable {
 						new Exception("An async function call terminated with an exception. " +
 							"Failing the AsyncWaitOperator.", e));
 				}
+
+				// remove the peeked element from the async collector buffer so that it is no longer
+				// checkpointed
+				streamElementQueue.poll();
+
+				// notify the main thread that there is again space left in the async collector
+				// buffer
+				checkpointLock.notifyAll();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a811545e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index f244008..396dbe8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -128,7 +128,7 @@ public class UnorderedStreamElementQueue implements StreamElementQueue {
 
 				return true;
 			} else {
-				LOG.debug("Failed to put element into ordered stream element queue because it " +
+				LOG.debug("Failed to put element into unordered stream element queue because it " +
 					"was full ({}/{}).", numberEntries, capacity);
 
 				return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/a811545e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 10df6d7..4558e06 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -41,10 +41,12 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -52,12 +54,17 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.ArrayDeque;
 import java.util.Collections;
@@ -73,7 +80,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -670,4 +683,122 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		Assert.assertNotNull(failureCause.getCause().getCause());
 		Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException);
 	}
+
+	/**
+	 * Test case for FLINK-5638: Tests that the async wait operator can be closed even if the
+	 * emitter is currently waiting on the checkpoint lock (e.g. in the case of two chained async
+	 * wait operators where the latter operator's queue is currently full).
+	 *
+	 * Note that this test does not enforce the exact strict ordering because with the fix it is no
+	 * longer possible. However, it provokes the described situation without the fix.
+	 */
+	@Test(timeout = 10000L)
+	public void testClosingWithBlockedEmitter() throws Exception {
+		final Object lock = new Object();
+
+		ArgumentCaptor<Throwable> failureReason = ArgumentCaptor.forClass(Throwable.class);
+
+		Environment environment = mock(Environment.class);
+		when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
+		when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
+		when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
+		when(environment.getTaskInfo()).thenReturn(new TaskInfo(
+			"testTask",
+			1,
+			0,
+			1,
+			0));
+		doNothing().when(environment).failExternally(failureReason.capture());
+
+		StreamTask<?, ?> containingTask = mock(StreamTask.class);
+		when(containingTask.getEnvironment()).thenReturn(environment);
+		when(containingTask.getCheckpointLock()).thenReturn(lock);
+		when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService());
+
+		StreamConfig streamConfig = mock(StreamConfig.class);
+		doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class));
+
+		final OneShotLatch closingLatch = new OneShotLatch();
+		final OneShotLatch outputLatch = new OneShotLatch();
+
+		Output<StreamRecord<Integer>> output = mock(Output.class);
+		doAnswer(new Answer() {
+			@Override
+			public Object answer(InvocationOnMock invocation) throws Throwable {
+				assertTrue("Output should happen under the checkpoint lock.", Thread.currentThread().holdsLock(lock));
+
+				outputLatch.trigger();
+
+				// wait until we're in the closing method of the operator
+				while (!closingLatch.isTriggered()) {
+					lock.wait();
+				}
+
+				return null;
+			}
+		}).when(output).collect(any(StreamRecord.class));
+
+		AsyncWaitOperator<Integer, Integer> operator = new TestAsyncWaitOperator<>(
+			new MyAsyncFunction(),
+			1000L,
+			1,
+			AsyncDataStream.OutputMode.ORDERED,
+			closingLatch);
+
+		operator.setup(
+			containingTask,
+			streamConfig,
+			output);
+
+		operator.open();
+
+		synchronized (lock) {
+			operator.processElement(new StreamRecord<>(42));
+		}
+
+		outputLatch.await();
+
+		synchronized (lock) {
+			operator.close();
+		}
+
+		// check that no concurrent exception has occurred
+		try {
+			verify(environment, never()).failExternally(any(Throwable.class));
+		} catch (Error e) {
+			// add the exception occurring in the emitter thread (root cause) as a suppressed
+			// exception
+			e.addSuppressed(failureReason.getValue());
+			throw e;
+		}
+	}
+
+	/**
+	 * Testing async wait operator which introduces a latch to synchronize the execution with the
+	 * emitter.
+	 */
+	private static final class TestAsyncWaitOperator<IN, OUT> extends AsyncWaitOperator<IN, OUT> {
+
+		private static final long serialVersionUID = -8528791694746625560L;
+
+		private final transient OneShotLatch closingLatch;
+
+		public TestAsyncWaitOperator(
+				AsyncFunction<IN, OUT> asyncFunction,
+				long timeout,
+				int capacity,
+				AsyncDataStream.OutputMode outputMode,
+				OneShotLatch closingLatch) {
+			super(asyncFunction, timeout, capacity, outputMode);
+
+			this.closingLatch = Preconditions.checkNotNull(closingLatch);
+		}
+
+		@Override
+		public void close() throws Exception {
+			closingLatch.trigger();
+			checkpointingLock.notifyAll();
+			super.close();
+		}
+	}
 }