You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/14 17:45:54 UTC

[flink] branch master updated: [FLINK-18935] Reject CompletedOperationCache.registerOngoingOperation if cache is shutting down

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 61caf2e  [FLINK-18935] Reject CompletedOperationCache.registerOngoingOperation if cache is shutting down
61caf2e is described below

commit 61caf2ea139684f216df873ed3b864b6cb9d4a4f
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Aug 13 10:50:22 2020 +0200

    [FLINK-18935] Reject CompletedOperationCache.registerOngoingOperation if cache is shutting down
    
    This commit changes the CompletedOperationCache so that it rejects registerOngoingOperation calls
    with an IllegalStateException if the cache is already shutting down. This ensures that now new
    operation will be enqueued while waiting for the previous operations to complete.
    
    This closes #13139.
---
 .../handler/async/CompletedOperationCache.java     | 33 ++++++++++++++++++----
 .../handler/async/CompletedOperationCacheTest.java | 27 ++++++++++++++++++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
index 95bb223..47e7c5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Map;
@@ -70,6 +71,11 @@ class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseabl
 	 */
 	private final Cache<K, ResultAccessTracker<R>> completedOperations;
 
+	private final Object lock = new Object();
+
+	@Nullable
+	private CompletableFuture<Void> terminationFuture;
+
 	CompletedOperationCache() {
 		this(Ticker.systemTicker());
 	}
@@ -102,12 +108,18 @@ class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseabl
 	 * Registers an ongoing operation with the cache.
 	 *
 	 * @param operationResultFuture A future containing the operation result.
+	 * @throw IllegalStateException if the cache is already shutting down
 	 */
 	public void registerOngoingOperation(
 			final K operationKey,
 			final CompletableFuture<R> operationResultFuture) {
 		final ResultAccessTracker<R> inProgress = ResultAccessTracker.inProgress();
-		registeredOperationTriggers.put(operationKey, inProgress);
+
+		synchronized (lock) {
+			checkState(isRunning(), "The CompletedOperationCache has already been closed.");
+			registeredOperationTriggers.put(operationKey, inProgress);
+		}
+
 		operationResultFuture.whenComplete((result, error) -> {
 			if (error == null) {
 				completedOperations.put(operationKey, inProgress.finishOperation(Either.Right(result)));
@@ -118,6 +130,11 @@ class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseabl
 		});
 	}
 
+	@GuardedBy("lock")
+	private boolean isRunning() {
+		return terminationFuture == null;
+	}
+
 	/**
 	 * Returns the operation result or a {@code Throwable} if the {@code CompletableFuture}
 	 * finished, otherwise {@code null}.
@@ -139,10 +156,16 @@ class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseabl
 
 	@Override
 	public CompletableFuture<Void> closeAsync() {
-		return FutureUtils.orTimeout(
-			asyncWaitForResultsToBeAccessed(),
-			COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS,
-			TimeUnit.SECONDS);
+		synchronized (lock) {
+			if (isRunning()) {
+					terminationFuture = FutureUtils.orTimeout(
+						asyncWaitForResultsToBeAccessed(),
+						COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS,
+						TimeUnit.SECONDS);
+			}
+
+			return terminationFuture;
+		}
 	}
 
 	private CompletableFuture<Void> asyncWaitForResultsToBeAccessed() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
index 2d13780..c8a2a92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
@@ -32,7 +32,9 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link CompletedOperationCache}.
@@ -77,4 +79,29 @@ public class CompletedOperationCacheTest extends TestLogger {
 		assertThat(operationResultOrError.right(), is(equalTo(TEST_OPERATION_RESULT.get())));
 		assertThat(closeCacheFuture.isDone(), is(true));
 	}
+
+	@Test
+	public void testCannotAddOperationAfterClosing() {
+		completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY, new CompletableFuture<>());
+		final CompletableFuture<Void> terminationFuture = completedOperationCache.closeAsync();
+
+		assertFalse(terminationFuture.isDone());
+
+		try {
+			completedOperationCache.registerOngoingOperation(new OperationKey(new TriggerId()), new CompletableFuture<>());
+			fail("It should no longer be possible to register new operations because the cache is shutting down.");
+		} catch (IllegalStateException ignored) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCanGetOperationResultAfterClosing() throws Exception {
+		completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY, TEST_OPERATION_RESULT);
+		completedOperationCache.closeAsync();
+
+		final Either<Throwable, String> result = completedOperationCache.get(TEST_OPERATION_KEY);
+
+		assertThat(result.right(), is(equalTo(TEST_OPERATION_RESULT.get())));
+	}
 }