You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 12:36:26 UTC

[flink] branch release-1.9 updated (42a475d -> b5aa679)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 42a475d  [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
     new 67184c2  [FLINK-13205][runtime] Make stop-with-savepoint non-blocking on SourceStreamTask checkpoint injecting thread
     new 1568ecc  [FLINK-13205][runtime] Make checkpoints injection ordered again (partial revert of FLINK-11458): use single threaded Task's dispatcher thread pool
     new 2678e69  [hotfix] Fix IOUtils.closeAll exceptions aggregation
     new b5aa679  [hotfix][runtime] SynchronousSavepointLatch: check completion condition in the blocking method in case of spurious wakeups

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/flink/util/IOUtils.java   |   2 +-
 .../BlockingCallMonitoringThreadPool.java          | 127 ------------------
 .../org/apache/flink/runtime/taskmanager/Task.java |  36 ++----
 .../BlockingCallMonitoringThreadPoolTest.java      | 112 ----------------
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  11 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  29 ++---
 .../runtime/tasks/SynchronousSavepointLatch.java   |  61 ++++-----
 .../runtime/tasks/SourceTaskTerminationTest.java   | 103 +++++----------
 .../runtime/tasks/SynchronousCheckpointITCase.java |  27 +---
 .../tasks/SynchronousSavepointSyncLatchTest.java   | 144 +++++++++++----------
 10 files changed, 168 insertions(+), 484 deletions(-)
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java


[flink] 01/04: [FLINK-13205][runtime] Make stop-with-savepoint non-blocking on SourceStreamTask checkpoint injecting thread

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 67184c27ca46b3562f0a9651663a091a17dadbbc
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Mon Jul 8 17:11:46 2019 +0200

    [FLINK-13205][runtime] Make stop-with-savepoint non-blocking on SourceStreamTask checkpoint injecting thread
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  29 ++---
 .../runtime/tasks/SynchronousSavepointLatch.java   |  55 ++++----
 .../runtime/tasks/SourceTaskTerminationTest.java   | 103 +++++----------
 .../runtime/tasks/SynchronousCheckpointITCase.java |  27 +---
 .../tasks/SynchronousSavepointSyncLatchTest.java   | 144 +++++++++++----------
 5 files changed, 148 insertions(+), 210 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2f4dd6a..f9fe110 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -702,7 +702,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			CheckpointMetrics checkpointMetrics) throws Exception {
 
 		try {
-			performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false);
+			if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false)) {
+				if (syncSavepointLatch.isSet()) {
+					syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
+				}
+			}
 		}
 		catch (CancelTaskException e) {
 			LOG.info("Operator {} was cancelled while performing checkpoint {}.",
@@ -739,7 +743,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		final long checkpointId = checkpointMetaData.getCheckpointId();
 
-		final boolean result;
 		synchronized (lock) {
 			if (isRunning) {
 
@@ -770,7 +773,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				//           impact progress of the streaming topology
 				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
 
-				result = true;
+				return true;
 			}
 			else {
 				// we cannot perform our checkpoint - let the downstream operators know that they
@@ -795,21 +798,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					throw exception;
 				}
 
-				result = false;
-			}
-		}
-
-		if (isRunning && syncSavepointLatch.isSet()) {
-
-			final boolean checkpointWasAcked =
-					syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
-
-			if (checkpointWasAcked) {
-				finishTask();
+				return false;
 			}
 		}
-
-		return result;
 	}
 
 	public ExecutorService getAsyncOperationsThreadPool() {
@@ -818,6 +809,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		boolean success = false;
 		synchronized (lock) {
 			if (isRunning) {
 				LOG.debug("Notification of complete checkpoint for task {}", getName());
@@ -828,12 +820,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					}
 				}
 
-				syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId);
+				success = true;
 			}
 			else {
 				LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
 			}
 		}
+		if (success) {
+			syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, this::finishTask);
+		}
 	}
 
 	private void tryShutdownTimerService() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
index 0a67dac..d10c19c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.util.function.RunnableWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 /**
  * A synchronization primitive used by the {@link StreamTask} to wait
@@ -29,12 +33,17 @@ class SynchronousSavepointLatch {
 
 	private static final long NOT_SET_CHECKPOINT_ID = -1L;
 
-	// these are mutually exclusive
+	enum CompletionResult {
+		COMPLETED,
+		CANCELED,
+	}
+
+	@GuardedBy("synchronizationPoint")
 	private volatile boolean waiting;
-	private volatile boolean completed;
-	private volatile boolean canceled;
 
-	private volatile boolean wasAlreadyCompleted;
+	@GuardedBy("synchronizationPoint")
+	@Nullable
+	private volatile CompletionResult completionResult;
 
 	private final Object synchronizationPoint;
 
@@ -44,8 +53,6 @@ class SynchronousSavepointLatch {
 		this.synchronizationPoint = new Object();
 
 		this.waiting = false;
-		this.completed = false;
-		this.canceled = false;
 		this.checkpointId = NOT_SET_CHECKPOINT_ID;
 	}
 
@@ -59,45 +66,38 @@ class SynchronousSavepointLatch {
 		}
 	}
 
-	boolean blockUntilCheckpointIsAcknowledged() throws Exception {
+	void blockUntilCheckpointIsAcknowledged() throws Exception {
 		synchronized (synchronizationPoint) {
-			if (isSet() && !isDone()) {
+			if (completionResult == null && isSet()) {
 				waiting = true;
 				synchronizationPoint.wait();
 				waiting = false;
 			}
-
-			if (!isCanceled() && !wasAlreadyCompleted) {
-				wasAlreadyCompleted = true;
-				return true;
-			}
-
-			return false;
 		}
 	}
 
-	void acknowledgeCheckpointAndTrigger(final long checkpointId) {
+	void acknowledgeCheckpointAndTrigger(final long checkpointId, RunnableWithException runnable) throws Exception {
 		synchronized (synchronizationPoint) {
-			if (isSet() && !isDone() && this.checkpointId == checkpointId) {
-				completed = true;
-				synchronizationPoint.notifyAll();
+			if (completionResult == null && this.checkpointId == checkpointId) {
+				completionResult = CompletionResult.COMPLETED;
+				try {
+					runnable.run();
+				} finally {
+					synchronizationPoint.notifyAll();
+				}
 			}
 		}
 	}
 
 	void cancelCheckpointLatch() {
 		synchronized (synchronizationPoint) {
-			if (!isDone()) {
-				canceled = true;
+			if (completionResult == null) {
+				completionResult = CompletionResult.CANCELED;
 				synchronizationPoint.notifyAll();
 			}
 		}
 	}
 
-	private boolean isDone () {
-		return canceled || completed;
-	}
-
 	@VisibleForTesting
 	boolean isWaiting() {
 		return waiting;
@@ -105,15 +105,14 @@ class SynchronousSavepointLatch {
 
 	@VisibleForTesting
 	boolean isCompleted() {
-		return completed;
+		return completionResult == CompletionResult.COMPLETED;
 	}
 
 	@VisibleForTesting
 	boolean isCanceled() {
-		return canceled;
+		return completionResult == CompletionResult.CANCELED;
 	}
 
-	@VisibleForTesting
 	boolean isSet() {
 		return checkpointId != NOT_SET_CHECKPOINT_ID;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
index 9295788..4a613d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -33,62 +32,56 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * A test verifying the termination process
  * (synchronous checkpoint and task termination) at the {@link SourceStreamTask}.
  */
-public class SourceTaskTerminationTest {
+public class SourceTaskTerminationTest extends TestLogger {
 
 	private static OneShotLatch ready;
 	private static MultiShotLatch runLoopStart;
 	private static MultiShotLatch runLoopEnd;
 
-	private static AtomicReference<Throwable> error;
+	@Rule
+	public final Timeout timeoutPerTest = Timeout.seconds(20);
 
 	@Before
 	public void initialize() {
 		ready = new OneShotLatch();
 		runLoopStart = new MultiShotLatch();
 		runLoopEnd = new MultiShotLatch();
-		error = new AtomicReference<>();
-
-		error.set(null);
-	}
-
-	@After
-	public void validate() {
-		validateNoExceptionsWereThrown();
 	}
 
 	@Test
-	public void terminateShouldBlockDuringCheckpointingAndEmitMaxWatermark() throws Exception {
+	public void testStopWithSavepointWithMaxWatermark() throws Exception {
 		stopWithSavepointStreamTaskTestHelper(true);
 	}
 
 	@Test
-	public void suspendShouldBlockDuringCheckpointingAndNotEmitMaxWatermark() throws Exception {
+	public void testStopWithSavepointWithoutMaxWatermark() throws Exception {
 		stopWithSavepointStreamTaskTestHelper(false);
 	}
 
-	private void stopWithSavepointStreamTaskTestHelper(final boolean expectMaxWatermark) throws Exception {
+	private void stopWithSavepointStreamTaskTestHelper(final boolean withMaxWatermark) throws Exception {
 		final long syncSavepointId = 34L;
 
 		final StreamTaskTestHarness<Long> srcTaskTestHarness = getSourceStreamTaskTestHarness();
 		final Thread executionThread = srcTaskTestHarness.invoke();
 		final StreamTask<Long, ?> srcTask = srcTaskTestHarness.getTask();
+		final SynchronousSavepointLatch syncSavepointLatch = srcTask.getSynchronousSavepointLatch();
 
 		ready.await();
 
@@ -96,15 +89,29 @@ public class SourceTaskTerminationTest {
 		emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 1L);
 		emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 2L);
 
-		emitAndVerifyCheckpoint(srcTaskTestHarness, srcTask, 31L);
+		srcTask.triggerCheckpoint(
+				new CheckpointMetaData(31L, 900),
+				CheckpointOptions.forCheckpointWithDefaultLocation(),
+				false);
+
+		assertFalse(syncSavepointLatch.isSet());
+		assertFalse(syncSavepointLatch.isCompleted());
+		assertFalse(syncSavepointLatch.isWaiting());
+
+		verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
 
 		emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 3L);
 
-		final Thread syncSavepointThread = triggerSynchronousSavepointFromDifferentThread(srcTask, expectMaxWatermark, syncSavepointId);
+		srcTask.triggerCheckpoint(
+				new CheckpointMetaData(syncSavepointId, 900),
+				new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()),
+				withMaxWatermark);
 
-		final SynchronousSavepointLatch syncSavepointFuture = waitForSyncSavepointFutureToBeSet(srcTask);
+		assertTrue(syncSavepointLatch.isSet());
+		assertFalse(syncSavepointLatch.isCompleted());
+		assertFalse(syncSavepointLatch.isWaiting());
 
-		if (expectMaxWatermark) {
+		if (withMaxWatermark) {
 			// if we are in TERMINATE mode, we expect the source task
 			// to emit MAX_WM before the SYNC_SAVEPOINT barrier.
 			verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
@@ -112,54 +119,12 @@ public class SourceTaskTerminationTest {
 
 		verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), syncSavepointId);
 
-		assertFalse(syncSavepointFuture.isCompleted());
-		assertTrue(syncSavepointFuture.isWaiting());
-
 		srcTask.notifyCheckpointComplete(syncSavepointId);
-		assertTrue(syncSavepointFuture.isCompleted());
+		assertTrue(syncSavepointLatch.isCompleted());
 
-		syncSavepointThread.join();
 		executionThread.join();
 	}
 
-	private void validateNoExceptionsWereThrown() {
-		if (error.get() != null && !(error.get() instanceof CancelTaskException)) {
-			fail(error.get().getMessage());
-		}
-	}
-
-	private Thread triggerSynchronousSavepointFromDifferentThread(
-			final StreamTask<Long, ?> task,
-			final boolean advanceToEndOfEventTime,
-			final long syncSavepointId) {
-		final Thread checkpointingThread = new Thread(() -> {
-			try {
-				task.triggerCheckpoint(
-						new CheckpointMetaData(syncSavepointId, 900),
-						new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()),
-						advanceToEndOfEventTime);
-			} catch (Exception e) {
-				error.set(e);
-			}
-		});
-		checkpointingThread.start();
-
-		return checkpointingThread;
-
-	}
-
-	private void emitAndVerifyCheckpoint(
-			final StreamTaskTestHarness<Long> srcTaskTestHarness,
-			final StreamTask<Long, ?> srcTask,
-			final long checkpointId) throws Exception {
-
-		srcTask.triggerCheckpoint(
-				new CheckpointMetaData(checkpointId, 900),
-				CheckpointOptions.forCheckpointWithDefaultLocation(),
-				false);
-		verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), checkpointId);
-	}
-
 	private StreamTaskTestHarness<Long> getSourceStreamTaskTestHarness() {
 		final StreamTaskTestHarness<Long> testHarness = new StreamTaskTestHarness<>(
 				SourceStreamTask::new,
@@ -177,16 +142,6 @@ public class SourceTaskTerminationTest {
 		return testHarness;
 	}
 
-	private SynchronousSavepointLatch waitForSyncSavepointFutureToBeSet(final StreamTask streamTaskUnderTest) throws InterruptedException {
-		final SynchronousSavepointLatch syncSavepointFuture = streamTaskUnderTest.getSynchronousSavepointLatch();
-		while (!syncSavepointFuture.isWaiting()) {
-			Thread.sleep(10L);
-
-			validateNoExceptionsWereThrown();
-		}
-		return syncSavepointFuture;
-	}
-
 	private void emitAndVerifyWatermarkAndElement(
 			final StreamTaskTestHarness<Long> srcTaskTestHarness,
 			final long expectedElement) throws InterruptedException {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index bac9d43..3b4aee3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
@@ -68,12 +67,10 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -88,8 +85,6 @@ import static org.mockito.Mockito.when;
  */
 public class SynchronousCheckpointITCase {
 
-	private static OneShotLatch checkpointTriggered = new OneShotLatch();
-
 	// A thread-safe queue to "log" and monitor events happening in the task's methods. Also, used by the test thread
 	// to synchronize actions with the task's threads.
 	private static LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();
@@ -98,7 +93,7 @@ public class SynchronousCheckpointITCase {
 	public final Timeout timeoutPerTest = Timeout.seconds(10);
 
 	@Test
-	public void taskCachedThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
+	public void taskDispatcherThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
 		final Task task = createTask(SynchronousCheckpointTestingTask.class);
 
 		try (TaskCleaner ignored = new TaskCleaner(task)) {
@@ -109,12 +104,6 @@ public class SynchronousCheckpointITCase {
 
 			assertEquals(ExecutionState.RUNNING, task.getExecutionState());
 
-			// Hack: we are triggering a checkpoint with advanceToEndOfEventTime = true, to be sure that
-			// triggerCheckpointBarrier has reached the sync checkpoint latch (by verifying in
-			// SynchronousCheckpointTestingTask.advanceToEndOfEventTime) and only then proceeding to
-			// notifyCheckpointComplete.
-			// Without such synchronization, the notifyCheckpointComplete execution may be executed first and leave this
-			// test in a deadlock.
 			task.triggerCheckpointBarrier(
 					42,
 					156865867234L,
@@ -122,16 +111,13 @@ public class SynchronousCheckpointITCase {
 					true);
 
 			assertThat(eventQueue.take(), is(Event.PRE_TRIGGER_CHECKPOINT));
+			assertThat(eventQueue.take(), is(Event.POST_TRIGGER_CHECKPOINT));
 			assertTrue(eventQueue.isEmpty());
 
-			checkpointTriggered.await();
-
 			task.notifyCheckpointComplete(42);
 
 			assertThat(eventQueue.take(), is(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE));
-			assertThat(
-				Arrays.asList(eventQueue.take(), eventQueue.take()),
-				containsInAnyOrder(Event.POST_NOTIFY_CHECKPOINT_COMPLETE, Event.POST_TRIGGER_CHECKPOINT));
+			assertThat(eventQueue.take(), is(Event.POST_NOTIFY_CHECKPOINT_COMPLETE));
 			assertTrue(eventQueue.isEmpty());
 
 			assertEquals(ExecutionState.RUNNING, task.getExecutionState());
@@ -196,13 +182,6 @@ public class SynchronousCheckpointITCase {
 		protected void cleanup() {
 
 		}
-
-		@Override
-		protected void advanceToEndOfEventTime() throws Exception {
-			// Wake up the test thread that we have actually entered the checkpoint invocation and the sync checkpoint
-			// latch is set.
-			checkpointTriggered.trigger();
-		}
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
index fa2f9b7..59f5a81 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.util.function.RunnableWithException;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -53,120 +56,127 @@ public class SynchronousSavepointSyncLatchTest {
 	}
 
 	@Test
-	public void waitAndThenTriggerWorks() throws Exception {
+	public void triggerUnblocksWait() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
-		final WaitingOnLatchCallable callable = new WaitingOnLatchCallable(latchUnderTest, 1L);
 
-		executors.submit(callable);
+		latchUnderTest.setCheckpointId(1L);
+		assertFalse(latchUnderTest.isWaiting());
 
-		while (!latchUnderTest.isSet()) {
+		Future<Void> future = runThreadWaitingForCheckpointAck(latchUnderTest);
+		while (!latchUnderTest.isWaiting()) {
 			Thread.sleep(5L);
 		}
 
+		final AtomicBoolean triggered = new AtomicBoolean();
+
 		// wrong checkpoint id.
-		latchUnderTest.acknowledgeCheckpointAndTrigger(2L);
+		latchUnderTest.acknowledgeCheckpointAndTrigger(2L, () -> triggered.set(true));
+		assertFalse(triggered.get());
+		assertFalse(latchUnderTest.isCompleted());
 		assertTrue(latchUnderTest.isWaiting());
 
-		latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
+		latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true));
+		assertTrue(triggered.get());
 		assertTrue(latchUnderTest.isCompleted());
+
+		future.get();
+		assertFalse(latchUnderTest.isWaiting());
 	}
 
 	@Test
-	public void waitAndThenCancelWorks() throws Exception {
+	public void cancelUnblocksWait() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
-		final WaitingOnLatchCallable callable = new WaitingOnLatchCallable(latchUnderTest, 1L);
 
-		final Future<Boolean> resultFuture = executors.submit(callable);
+		latchUnderTest.setCheckpointId(1L);
+		assertFalse(latchUnderTest.isWaiting());
 
-		while (!latchUnderTest.isSet()) {
+		Future<Void> future = runThreadWaitingForCheckpointAck(latchUnderTest);
+		while (!latchUnderTest.isWaiting()) {
 			Thread.sleep(5L);
 		}
 
 		latchUnderTest.cancelCheckpointLatch();
-
-		boolean result = resultFuture.get();
-
-		assertFalse(result);
 		assertTrue(latchUnderTest.isCanceled());
+
+		future.get();
+		assertFalse(latchUnderTest.isWaiting());
 	}
 
 	@Test
-	public void triggeringReturnsTrueAtMostOnce() throws Exception {
+	public void waitAfterTriggerIsNotBlocking() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
 
-		final WaitingOnLatchCallable firstCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
-		final WaitingOnLatchCallable secondCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
-
-		final Future<Boolean> firstFuture = executors.submit(firstCallable);
-		final Future<Boolean> secondFuture = executors.submit(secondCallable);
+		latchUnderTest.setCheckpointId(1L);
+		latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> {});
 
-		while (!latchUnderTest.isSet()) {
-			Thread.sleep(5L);
-		}
+		latchUnderTest.blockUntilCheckpointIsAcknowledged();
+	}
 
-		latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
+	@Test
+	public void waitAfterCancelIsNotBlocking() throws Exception {
+		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
 
-		final boolean firstResult = firstFuture.get();
-		final boolean secondResult = secondFuture.get();
+		latchUnderTest.setCheckpointId(1L);
+		latchUnderTest.cancelCheckpointLatch();
+		assertTrue(latchUnderTest.isCanceled());
 
-		// only one of the two can be true (it is a race so we do not know which one)
-		assertTrue(firstResult ^ secondResult);
+		latchUnderTest.blockUntilCheckpointIsAcknowledged();
 	}
 
 	@Test
-	public void waitAfterTriggerReturnsTrueImmediately() throws Exception {
+	public void triggeringInvokesCallbackAtMostOnce() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
+
 		latchUnderTest.setCheckpointId(1L);
-		latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
-		final boolean triggerred = latchUnderTest.blockUntilCheckpointIsAcknowledged();
-		assertTrue(triggerred);
+
+		AtomicInteger counter = new AtomicInteger();
+		Future<Void> future1 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+		Future<Void> future2 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+		Future<Void> future3 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+		future1.get();
+		future2.get();
+		future3.get();
+
+		assertEquals(1, counter.get());
 	}
 
 	@Test
-	public void waitAfterCancelDoesNothing() throws Exception {
+	public void triggeringAfterCancelDoesNotInvokeCallback() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
+
 		latchUnderTest.setCheckpointId(1L);
 		latchUnderTest.cancelCheckpointLatch();
-		latchUnderTest.blockUntilCheckpointIsAcknowledged();
+		assertTrue(latchUnderTest.isCanceled());
+
+		final AtomicBoolean triggered = new AtomicBoolean();
+		latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true));
+		assertFalse(triggered.get());
 	}
 
 	@Test
-	public void checkpointIdIsSetOnlyOnce() throws InterruptedException {
+	public void checkpointIdIsSetOnlyOnce() {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
 
-		final WaitingOnLatchCallable firstCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
-		executors.submit(firstCallable);
-
-		while (!latchUnderTest.isSet()) {
-			Thread.sleep(5L);
-		}
-
-		final WaitingOnLatchCallable secondCallable = new WaitingOnLatchCallable(latchUnderTest, 2L);
-		executors.submit(secondCallable);
-
-		latchUnderTest.acknowledgeCheckpointAndTrigger(2L);
-		assertTrue(latchUnderTest.isWaiting());
+		latchUnderTest.setCheckpointId(1L);
+		assertTrue(latchUnderTest.isSet());
+		assertEquals(1L, latchUnderTest.getCheckpointId());
 
-		latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
-		assertTrue(latchUnderTest.isCompleted());
+		latchUnderTest.setCheckpointId(2L);
+		assertTrue(latchUnderTest.isSet());
+		assertEquals(1L, latchUnderTest.getCheckpointId());
 	}
 
-	private static final class WaitingOnLatchCallable implements Callable<Boolean> {
-
-		private final SynchronousSavepointLatch latch;
-		private final long checkpointId;
-
-		WaitingOnLatchCallable(
-				final SynchronousSavepointLatch latch,
-				final long checkpointId) {
-			this.latch = checkNotNull(latch);
-			this.checkpointId = checkpointId;
-		}
+	private Future<Void> runThreadWaitingForCheckpointAck(SynchronousSavepointLatch latch) {
+		return executors.submit(() -> {
+			latch.blockUntilCheckpointIsAcknowledged();
+			return null;
+		});
+	}
 
-		@Override
-		public Boolean call() throws Exception {
-			latch.setCheckpointId(checkpointId);
-			return latch.blockUntilCheckpointIsAcknowledged();
-		}
+	private Future<Void> runThreadTriggeringCheckpoint(SynchronousSavepointLatch latch, long checkpointId, RunnableWithException runnable) {
+		return executors.submit(() -> {
+			latch.acknowledgeCheckpointAndTrigger(checkpointId, runnable);
+			return null;
+		});
 	}
 }


[flink] 03/04: [hotfix] Fix IOUtils.closeAll exceptions aggregation

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2678e6958d62ebedae88eef6bab6f9f7f74d3b65
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Wed Jul 10 15:53:02 2019 +0200

    [hotfix] Fix IOUtils.closeAll exceptions aggregation
---
 flink-core/src/main/java/org/apache/flink/util/IOUtils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 498c899..02b11e6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -231,7 +231,7 @@ public final class IOUtils {
 						closeable.close();
 					}
 				} catch (Exception e) {
-					collectedExceptions = ExceptionUtils.firstOrSuppressed(collectedExceptions, e);
+					collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);
 				}
 			}
 


[flink] 02/04: [FLINK-13205][runtime] Make checkpoints injection ordered again (partial revert of FLINK-11458): use single threaded Task's dispatcher thread pool

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1568ecc67def90237d151b4f43de8d4c16ee6116
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Tue Jun 4 14:38:13 2019 +0200

    [FLINK-13205][runtime] Make checkpoints injection ordered again (partial revert of FLINK-11458): use single threaded Task's dispatcher thread pool
---
 .../BlockingCallMonitoringThreadPool.java          | 127 ---------------------
 .../org/apache/flink/runtime/taskmanager/Task.java |  36 +++---
 .../BlockingCallMonitoringThreadPoolTest.java      | 112 ------------------
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  11 +-
 4 files changed, 15 insertions(+), 271 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
deleted file mode 100644
index d0fb868..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A Thread Pool used to monitor the number of in-flight calls that block and wait for another task executed
- * by the same pool in order to get unblocked. When a call (blocking or non-blocking) is submitted, the size
- * of the pool is set to {@code 1 + activeBlockingCalls}. This allows the thread pool size to follow the needs
- * of the system and to avoid any redundant idle threads consuming resources.
- */
-public class BlockingCallMonitoringThreadPool {
-
-	private static final Logger LOG = LoggerFactory.getLogger(BlockingCallMonitoringThreadPool.class);
-
-	private final AtomicInteger inFlightBlockingCallCounter = new AtomicInteger(0);
-
-	private final ThreadPoolExecutor executor;
-
-	public BlockingCallMonitoringThreadPool() {
-		this(Executors.defaultThreadFactory());
-	}
-
-	public BlockingCallMonitoringThreadPool(final ThreadFactory dispatcherThreadFactory) {
-		this.executor = new ThreadPoolExecutor(
-				1,
-				1,
-				10L,
-				TimeUnit.SECONDS,
-				new LinkedBlockingQueue<>(),
-				checkNotNull(dispatcherThreadFactory));
-	}
-
-	public CompletableFuture<?> submit(final Runnable runnable, final boolean blocking) {
-		if (blocking) {
-			return submitBlocking(runnable);
-		} else {
-			return submit(runnable);
-		}
-	}
-
-	private CompletableFuture<?> submit(final Runnable task) {
-		adjustThreadPoolSize(inFlightBlockingCallCounter.get());
-		return CompletableFuture.runAsync(task, executor);
-	}
-
-	private CompletableFuture<?> submitBlocking(final Runnable task) {
-		adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
-		return CompletableFuture.runAsync(task, executor).whenComplete(
-				(ignored, e) -> inFlightBlockingCallCounter.decrementAndGet());
-	}
-
-	private void adjustThreadPoolSize(final int activeBlockingCalls) {
-		if (activeBlockingCalls > 1) {
-			LOG.debug("There are {} active threads with blocking calls", activeBlockingCalls);
-		}
-
-		final int newPoolSize = 1 + activeBlockingCalls;
-
-		// We have to reset the core pool size because (quoted from the official docs):
-		// ``
-		// If there are more than corePoolSize but less than maximumPoolSize threads running,
-		// ** a new thread will be created ONLY IF THE QUEUE IS FULL **.
-		// ``
-
-		// ensure that regardless of whether we increase/reduce the pool size, maximum is always >= core
-		if (newPoolSize < executor.getCorePoolSize()) {
-			executor.setCorePoolSize(newPoolSize);
-			executor.setMaximumPoolSize(newPoolSize);
-		} else {
-			executor.setMaximumPoolSize(newPoolSize);
-			executor.setCorePoolSize(newPoolSize);
-		}
-	}
-
-	public void shutdown() {
-		executor.shutdown();
-	}
-
-	public boolean isShutdown() {
-		return executor.isShutdown();
-	}
-
-	public void shutdownNow() {
-		executor.shutdownNow();
-	}
-
-	@VisibleForTesting
-	int getMaximumPoolSize() {
-		return executor.getMaximumPoolSize();
-	}
-
-	@VisibleForTesting
-	int getQueueSize() {
-		return executor.getQueue().size();
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 4355821..d4e1d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -93,6 +93,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -257,8 +259,8 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 	/** The observed exception, in case the task execution failed. */
 	private volatile Throwable failureCause;
 
-	/** Executor for asynchronous calls (checkpoints, etc), lazily initialized. */
-	private volatile BlockingCallMonitoringThreadPool asyncCallDispatcher;
+	/** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized. */
+	private volatile ExecutorService asyncCallDispatcher;
 
 	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
 	private long taskCancellationInterval;
@@ -789,7 +791,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 
 				// stop the async dispatcher.
 				// copy dispatcher reference to stack, against concurrent release
-				final BlockingCallMonitoringThreadPool dispatcher = this.asyncCallDispatcher;
+				ExecutorService dispatcher = this.asyncCallDispatcher;
 				if (dispatcher != null && !dispatcher.isShutdown()) {
 					dispatcher.shutdownNow();
 				}
@@ -1153,8 +1155,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			};
 			executeAsyncCallRunnable(
 					runnable,
-					String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId),
-					checkpointOptions.getCheckpointType().isSynchronous());
+					String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
 		}
 		else {
 			LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
@@ -1189,8 +1190,8 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			};
 			executeAsyncCallRunnable(
 					runnable,
-					"Checkpoint Confirmation for " + taskNameWithSubtask,
-					false);
+					"Checkpoint Confirmation for " + taskNameWithSubtask
+			);
 		}
 		else {
 			LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
@@ -1201,11 +1202,10 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 
 	/**
 	 * Utility method to dispatch an asynchronous call on the invokable.
-	 *
-	 * @param runnable The async call runnable.
+	 *  @param runnable The async call runnable.
 	 * @param callName The name of the call, for logging purposes.
 	 */
-	private void executeAsyncCallRunnable(Runnable runnable, String callName, boolean blocking) {
+	private void executeAsyncCallRunnable(Runnable runnable, String callName) {
 		// make sure the executor is initialized. lock against concurrent calls to this function
 		synchronized (this) {
 			if (executionState != ExecutionState.RUNNING) {
@@ -1213,20 +1213,12 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			}
 
 			// get ourselves a reference on the stack that cannot be concurrently modified
-			BlockingCallMonitoringThreadPool executor = this.asyncCallDispatcher;
+			ExecutorService executor = this.asyncCallDispatcher;
 			if (executor == null) {
 				// first time use, initialize
 				checkState(userCodeClassLoader != null, "userCodeClassLoader must not be null");
 
-				// Under normal execution, we expect that one thread will suffice, this is why we
-				// keep the core threads to 1. In the case of a synchronous savepoint, we will block
-				// the checkpointing thread, so we need an additional thread to execute the
-				// notifyCheckpointComplete() callback. Finally, we aggressively purge (potentially)
-				// idle thread so that we do not risk to have many idle thread on machines with multiple
-				// tasks on them. Either way, only one of them can execute at a time due to the
-				// checkpoint lock.
-
-				executor = new BlockingCallMonitoringThreadPool(
+				executor = Executors.newSingleThreadExecutor(
 						new DispatcherThreadFactory(
 							TASK_THREADS_GROUP,
 							"Async calls on " + taskNameWithSubtask,
@@ -1245,13 +1237,13 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			LOG.debug("Invoking async call {} on task {}", callName, taskNameWithSubtask);
 
 			try {
-				executor.submit(runnable, blocking);
+				executor.submit(runnable);
 			}
 			catch (RejectedExecutionException e) {
 				// may be that we are concurrently finished or canceled.
 				// if not, report that something is fishy
 				if (executionState == ExecutionState.RUNNING) {
-					throw new RuntimeException("Async call with a " + (blocking ? "" : "non-") + "blocking call was rejected, even though the task is running.", e);
+					throw new RuntimeException("Async call was rejected, even though the task is running.", e);
 				}
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
deleted file mode 100644
index 2cc3454..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link BlockingCallMonitoringThreadPool}.
- */
-public class BlockingCallMonitoringThreadPoolTest {
-
-	private final static int TIME_OUT = 30;
-
-	private final OneShotLatch latch1 = new OneShotLatch();
-	private final OneShotLatch latch2 = new OneShotLatch();
-	private BlockingCallMonitoringThreadPool blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
-
-	@Before
-	public void setup() {
-		blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
-		latch1.reset();
-		latch2.reset();
-	}
-
-	@After
-	public void tearDown() {
-		latch1.trigger();
-		latch2.trigger();
-		blockingCallThreadPool.shutdown();
-	}
-
-	@Test
-	public void testSubmitNonBlockingCalls() throws Exception {
-		blockingCallThreadPool.submit(() -> await(latch1), false);
-		blockingCallThreadPool.submit(() -> await(latch2), false);
-
-		assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
-		assertEquals(1, blockingCallThreadPool.getQueueSize());
-	}
-
-	@Test
-	public void testSubmitBlockingCall() throws Exception {
-		CompletableFuture<?> latch1Future = blockingCallThreadPool.submit(() -> await(latch1), true);
-		CompletableFuture<?> latch2Future = blockingCallThreadPool.submit(() -> await(latch2), false);
-
-		assertEquals(2, blockingCallThreadPool.getMaximumPoolSize());
-		assertEquals(0, blockingCallThreadPool.getQueueSize());
-
-		latch2.trigger();
-		latch2Future.get(TIME_OUT, TimeUnit.SECONDS);
-
-		assertFalse(latch1Future.isDone());
-		assertTrue(latch2Future.isDone());
-	}
-
-	@Test
-	public void testDownsizePool() throws Exception {
-		List<CompletableFuture<?>> futures = new ArrayList<>();
-
-		futures.add(blockingCallThreadPool.submit(() -> await(latch1), true));
-		futures.add(blockingCallThreadPool.submit(() -> await(latch1), true));
-		futures.add(blockingCallThreadPool.submit(() -> await(latch1), false));
-
-		assertEquals(3, blockingCallThreadPool.getMaximumPoolSize());
-
-		latch1.trigger();
-
-		for (CompletableFuture<?> future : futures) {
-			future.get(TIME_OUT, TimeUnit.SECONDS);
-		}
-
-		blockingCallThreadPool.submit(() -> await(latch1), false).get(TIME_OUT, TimeUnit.SECONDS);
-		assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
-	}
-
-	private void await(OneShotLatch latch) {
-		try {
-			latch.await();
-		} catch (InterruptedException e) {
-			throw new RuntimeException(e);
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5e43f68..f7b366b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -62,7 +62,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -130,16 +129,12 @@ public class TaskAsyncCallTest extends TestLogger {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Tests 
+	//  Tests
 	// ------------------------------------------------------------------------
 
 	@Test
-	@Ignore
 	public void testCheckpointCallsInOrder() throws Exception {
 
-		// test ignored because with the changes introduced by [FLINK-11667],
-		// there is not guarantee about the order in which checkpoints are executed.
-
 		Task task = createTask(CheckpointsInOrderInvokable.class);
 		try (TaskCleaner ignored = new TaskCleaner(task)) {
 			task.startTaskThread();
@@ -160,12 +155,8 @@ public class TaskAsyncCallTest extends TestLogger {
 	}
 
 	@Test
-	@Ignore
 	public void testMixedAsyncCallsInOrder() throws Exception {
 
-		// test ignored because with the changes introduced by [FLINK-11667],
-		// there is not guarantee about the order in which checkpoints are executed.
-
 		Task task = createTask(CheckpointsInOrderInvokable.class);
 		try (TaskCleaner ignored = new TaskCleaner(task)) {
 			task.startTaskThread();


[flink] 04/04: [hotfix][runtime] SynchronousSavepointLatch: check completion condition in the blocking method in case of spurious wakeups

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5aa6792bb04eaabb86bf9d73f12ec8441153f77
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Wed Jul 10 17:17:05 2019 +0200

    [hotfix][runtime] SynchronousSavepointLatch: check completion condition in the blocking method in case of spurious wakeups
---
 .../streaming/runtime/tasks/SynchronousSavepointLatch.java     | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
index d10c19c..a6a583d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
@@ -66,11 +66,13 @@ class SynchronousSavepointLatch {
 		}
 	}
 
-	void blockUntilCheckpointIsAcknowledged() throws Exception {
+	void blockUntilCheckpointIsAcknowledged() throws InterruptedException {
 		synchronized (synchronizationPoint) {
-			if (completionResult == null && isSet()) {
-				waiting = true;
-				synchronizationPoint.wait();
+			if (isSet()) {
+				while (completionResult == null) {
+					waiting = true;
+					synchronizationPoint.wait();
+				}
 				waiting = false;
 			}
 		}