You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/25 14:26:26 UTC

[1/3] flink git commit: [FLINK-7949] Add unit test for AsyncWaitOperator recovery with full queue

Repository: flink
Updated Branches:
  refs/heads/release-1.4 38278ebef -> 09edf6a62


[FLINK-7949] Add unit test for AsyncWaitOperator recovery with full queue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb088bc1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb088bc1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb088bc1

Branch: refs/heads/release-1.4
Commit: fb088bc1343099a1ea71d0589ab825897e8dcdee
Parents: a2198b0
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 10 18:53:38 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:26:09 2018 +0100

----------------------------------------------------------------------
 .../operators/async/AsyncWaitOperatorTest.java  | 123 ++++++++++++++++++-
 1 file changed, 122 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb088bc1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 993bffb..34c9a0f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -56,6 +57,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AcknowledgeStreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -65,6 +67,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -74,10 +77,12 @@ import org.mockito.stubbing.Answer;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -88,6 +93,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
@@ -970,6 +976,122 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the AysncWaitOperator can restart if checkpointed queue was full.
+	 *
+	 * <p>See FLINK-7949
+	 */
+	@Test(timeout = 10000)
+	public void testRestartWithFullQueue() throws Exception {
+		int capacity = 10;
+
+		// 1. create the snapshot which contains capacity + 1 elements
+		final CompletableFuture<Void> trigger = new CompletableFuture<>();
+		final ControllableAsyncFunction<Integer> controllableAsyncFunction = new ControllableAsyncFunction<>(trigger);
+
+		final OneInputStreamOperatorTestHarness<Integer, Integer> snapshotHarness = new OneInputStreamOperatorTestHarness<>(
+			new AsyncWaitOperator<>(
+				controllableAsyncFunction, // the NoOpAsyncFunction is like a blocking function
+				1000L,
+				capacity,
+				AsyncDataStream.OutputMode.ORDERED),
+			IntSerializer.INSTANCE);
+
+		snapshotHarness.open();
+
+		final OperatorStateHandles snapshot;
+
+		final ArrayList<Integer> expectedOutput = new ArrayList<>(capacity + 1);
+
+		try {
+			synchronized (snapshotHarness.getCheckpointLock()) {
+				for (int i = 0; i < capacity; i++) {
+					snapshotHarness.processElement(i, 0L);
+					expectedOutput.add(i);
+				}
+			}
+
+			expectedOutput.add(capacity);
+
+			final OneShotLatch lastElement = new OneShotLatch();
+
+			final CheckedThread lastElementWriter = new CheckedThread() {
+				@Override
+				public void go() throws Exception {
+					synchronized (snapshotHarness.getCheckpointLock()) {
+						lastElement.trigger();
+						snapshotHarness.processElement(capacity, 0L);
+					}
+				}
+			};
+
+			lastElementWriter.start();
+
+			lastElement.await();
+
+			synchronized (snapshotHarness.getCheckpointLock()) {
+				// execute the snapshot within the checkpoint lock, because then it is guaranteed
+				// that the lastElementWriter has written the exceeding element
+				snapshot = snapshotHarness.snapshot(0L, 0L);
+			}
+
+			// trigger the computation to make the close call finish
+			trigger.complete(null);
+		} finally {
+			synchronized (snapshotHarness.getCheckpointLock()) {
+				snapshotHarness.close();
+			}
+		}
+
+		// 2. restore the snapshot and check that we complete
+		final OneInputStreamOperatorTestHarness<Integer, Integer> recoverHarness = new OneInputStreamOperatorTestHarness<>(
+			new AsyncWaitOperator<>(
+				new ControllableAsyncFunction<>(CompletableFuture.completedFuture(null)),
+				1000L,
+				capacity,
+				AsyncDataStream.OutputMode.ORDERED),
+			IntSerializer.INSTANCE);
+
+		recoverHarness.initializeState(snapshot);
+
+		synchronized (recoverHarness.getCheckpointLock()) {
+			recoverHarness.open();
+		}
+
+		synchronized (recoverHarness.getCheckpointLock()) {
+			recoverHarness.close();
+		}
+
+		final ConcurrentLinkedQueue<Object> output = recoverHarness.getOutput();
+
+		assertThat(output.size(), Matchers.equalTo(capacity + 1));
+
+		final ArrayList<Integer> outputElements = new ArrayList<>(capacity + 1);
+
+		for (int i = 0; i < capacity + 1; i++) {
+			StreamRecord<Integer> streamRecord = ((StreamRecord<Integer>) output.poll());
+			outputElements.add(streamRecord.getValue());
+		}
+
+		assertThat(outputElements, Matchers.equalTo(expectedOutput));
+	}
+
+	private static class ControllableAsyncFunction<IN> implements AsyncFunction<IN, IN> {
+
+		private static final long serialVersionUID = -4214078239267288636L;
+
+		private transient CompletableFuture<Void> trigger;
+
+		private ControllableAsyncFunction(CompletableFuture<Void> trigger) {
+			this.trigger = Preconditions.checkNotNull(trigger);
+		}
+
+		@Override
+		public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) throws Exception {
+			trigger.thenAccept(v -> resultFuture.complete(Collections.singleton(input)));
+		}
+	}
+
 	private static class NoOpAsyncFunction<IN, OUT> implements AsyncFunction<IN, OUT> {
 		private static final long serialVersionUID = -3060481953330480694L;
 
@@ -978,5 +1100,4 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			// no op
 		}
 	}
-
 }


[2/3] flink git commit: [FLINK-7949] Make AsyncWaitOperator recoverable also when queue is full

Posted by tr...@apache.org.
[FLINK-7949] Make AsyncWaitOperator recoverable also when queue is full

Start emitter thread BEFORE filling up the queue of recovered elements.
This guarantees that we won't deadlock inserting the recovered elements,
because the emitter can already start processing elements.

This closes #4924.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2198b04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2198b04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2198b04

Branch: refs/heads/release-1.4
Commit: a2198b04712b8ec6105999414f33781c6efcf4a9
Parents: 38278eb
Author: Bartłomiej Tartanus <ba...@gmail.com>
Authored: Mon Oct 30 15:39:43 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:26:09 2018 +0100

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java      | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2198b04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index aec20c0..a7b9438 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -163,6 +163,14 @@ public class AsyncWaitOperator<IN, OUT>
 	public void open() throws Exception {
 		super.open();
 
+		// create the emitter
+		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
+
+		// start the emitter thread
+		this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
+		emitterThread.setDaemon(true);
+		emitterThread.start();
+
 		// process stream elements from state, since the Emit thread will start as soon as all
 		// elements from previous state are in the StreamElementQueue, we have to make sure that the
 		// order to open all operators in the operator chain proceeds from the tail operator to the
@@ -186,14 +194,6 @@ public class AsyncWaitOperator<IN, OUT>
 			recoveredStreamElements = null;
 		}
 
-		// create the emitter
-		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
-
-		// start the emitter thread
-		this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
-		emitterThread.setDaemon(true);
-		emitterThread.start();
-
 	}
 
 	@Override


[3/3] flink git commit: [FLINK-8485] [client] Unblock JobSubmissionClientActor#tryToSubmitJob

Posted by tr...@apache.org.
[FLINK-8485] [client] Unblock JobSubmissionClientActor#tryToSubmitJob

The JobSubmissionClientActor blocked a ActorSystem's dispatcher thread when requesting
the BlobServer port from the cluster. This fails when using the FlinkMiniCluster on a
single core machine because we set the number of threads to 1.

This commit unblocks the JobSubmissionClientActor#tryToSubmitJob method and sets the
lower limit of dispatcher threads to 2 when using the FlinkMiniCluster.

This closes #5360.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09edf6a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09edf6a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09edf6a6

Branch: refs/heads/release-1.4
Commit: 09edf6a62822941e8a2f0e6179b85c6fce8fd8c9
Parents: fb088bc
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 25 11:40:33 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:26:10 2018 +0100

----------------------------------------------------------------------
 .../client/JobSubmissionClientActor.java        | 131 ++++++++-----------
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   2 +-
 2 files changed, 58 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09edf6a6/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 4ca6e8b..e9824ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.client;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.dispatch.Futures;
-
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
@@ -35,14 +30,19 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedThrowable;
-import scala.concurrent.duration.FiniteDuration;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletionException;
+
+import scala.concurrent.duration.FiniteDuration;
 
 
 /**
@@ -50,11 +50,11 @@ import java.util.concurrent.TimeUnit;
  */
 public class JobSubmissionClientActor extends JobClientActor {
 
-	/** JobGraph which shall be submitted to the JobManager */
+	/** JobGraph which shall be submitted to the JobManager. */
 	private JobGraph jobGraph;
-	/** true if a SubmitJobSuccess message has been received */
+	/** true if a SubmitJobSuccess message has been received. */
 	private boolean jobSuccessfullySubmitted = false;
-	/** The cluster configuration */
+	/** The cluster configuration. */
 	private final Configuration clientConfig;
 
 	public JobSubmissionClientActor(
@@ -66,7 +66,6 @@ public class JobSubmissionClientActor extends JobClientActor {
 		this.clientConfig = clientConfig;
 	}
 
-
 	@Override
 	public void connectedToJobManager() {
 		if (jobGraph != null && !jobSuccessfullySubmitted) {
@@ -143,77 +142,61 @@ public class JobSubmissionClientActor extends JobClientActor {
 		LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress",
 			jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
 
-		Futures.future(new Callable<Object>() {
-			@Override
-			public Object call() throws Exception {
-				final ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
-				final AkkaJobManagerGateway akkaJobManagerGateway = new AkkaJobManagerGateway(jobManagerGateway);
+		final ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+		final AkkaJobManagerGateway akkaJobManagerGateway = new AkkaJobManagerGateway(jobManagerGateway);
 
-				LOG.info("Upload jar files to job manager {}.", jobManager.path());
+		LOG.info("Upload jar files to job manager {}.", jobManager.path());
 
-				final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress(
-					akkaJobManagerGateway,
-					Time.milliseconds(timeout.toMillis()));
-				final InetSocketAddress blobServerAddress;
-
-				try {
-					blobServerAddress = blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-				} catch (Exception e) {
-					getSelf().tell(
-						decorateMessage(new JobManagerMessages.JobResultFailure(
-							new SerializedThrowable(
-								new JobSubmissionException(
-									jobGraph.getJobID(),
-									"Could not retrieve BlobServer address.",
-									e)
-							)
-						)),
-						ActorRef.noSender());
-
-					return null;
-				}
+		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress(
+			akkaJobManagerGateway,
+			Time.milliseconds(timeout.toMillis()));
 
+		final CompletableFuture<Void> jarUploadFuture = blobServerAddressFuture.thenAcceptAsync(
+			(InetSocketAddress blobServerAddress) -> {
 				try {
 					jobGraph.uploadUserJars(blobServerAddress, clientConfig);
-				} catch (IOException exception) {
-					getSelf().tell(
-						decorateMessage(new JobManagerMessages.JobResultFailure(
-							new SerializedThrowable(
-								new JobSubmissionException(
-									jobGraph.getJobID(),
-									"Could not upload the jar files to the job manager.",
-									exception)
-							)
-						)),
-						ActorRef.noSender());
-
-					return null;
+				} catch (IOException e) {
+					throw new CompletionException(
+						new JobSubmissionException(
+							jobGraph.getJobID(),
+							"Could not upload the jar files to the job manager.",
+							e));
 				}
+			},
+			getContext().dispatcher());
+
+		jarUploadFuture
+			.thenAccept(
+				(Void ignored) -> {
+					LOG.info("Submit job to the job manager {}.", jobManager.path());
+
+					jobManager.tell(
+						decorateMessage(
+							new JobManagerMessages.SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+						getSelf());
 
-				LOG.info("Submit job to the job manager {}.", jobManager.path());
-
-				jobManager.tell(
-					decorateMessage(
-						new JobManagerMessages.SubmitJob(
-							jobGraph,
-							ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
-					getSelf());
-
-				// issue a SubmissionTimeout message to check that we submit the job within
-				// the given timeout
-				getContext().system().scheduler().scheduleOnce(
-					timeout,
-					getSelf(),
-					decorateMessage(JobClientMessages.getSubmissionTimeout()),
-					getContext().dispatcher(),
-					ActorRef.noSender());
-
-				return null;
-			}
-		}, getContext().dispatcher());
+					// issue a SubmissionTimeout message to check that we submit the job within
+					// the given timeout
+					getContext().system().scheduler().scheduleOnce(
+						timeout,
+						getSelf(),
+						decorateMessage(JobClientMessages.getSubmissionTimeout()),
+						getContext().dispatcher(),
+						ActorRef.noSender());
+				})
+			.whenComplete(
+				(Void ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						getSelf().tell(
+							decorateMessage(new JobManagerMessages.JobResultFailure(
+								new SerializedThrowable(ExceptionUtils.stripCompletionException(throwable)))),
+							ActorRef.noSender());
+					}
+				});
 	}
 
-
 	public static Props createActorProps(
 			LeaderRetrievalService leaderRetrievalService,
 			FiniteDuration timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/09edf6a6/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b80a070..ed14660 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -249,7 +249,7 @@ object AkkaUtils {
          |    default-dispatcher {
          |      fork-join-executor {
          |        parallelism-factor = 1.0
-         |        parallelism-min = 1
+         |        parallelism-min = 2
          |        parallelism-max = 4
          |      }
          |    }