You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/24 09:52:11 UTC

flink git commit: [FLINK-5372][tests] Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

Repository: flink
Updated Branches:
  refs/heads/master 614b1e29e -> 34254886f


[FLINK-5372][tests] Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34254886
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34254886
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34254886

Branch: refs/heads/master
Commit: 34254886fb51c2f25a025077e59a091c2701fb04
Parents: 614b1e2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Apr 23 17:25:46 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Apr 24 11:52:02 2018 +0200

----------------------------------------------------------------------
 .../runtime/state/OperatorStateBackendTest.java   |  6 +++++-
 .../util/BlockerCheckpointStreamFactory.java      | 18 ++++++++++++------
 .../streaming/state/RocksDBAsyncSnapshotTest.java | 18 +++++++++++++-----
 .../streaming/state/RocksDBStateBackendTest.java  | 13 +++++++++++--
 4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34254886/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index d6d4af7..16e5188 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableL
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
 
@@ -834,7 +835,10 @@ public class OperatorStateBackendTest {
 
 		// cancel the future, which should close the underlying stream
 		runnableFuture.cancel(true);
-		Assert.assertTrue(streamFactory.getLastCreatedStream().isClosed());
+
+		for (BlockingCheckpointOutputStream stream : streamFactory.getAllCreatedStreams()) {
+			Assert.assertTrue(stream.isClosed());
+		}
 
 		// we allow the stream under test to proceed
 		blockerLatch.trigger();

http://git-wip-us.apache.org/repos/asf/flink/blob/34254886/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index ce41b10..b1bb0ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO.
@@ -39,14 +41,15 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
 	protected volatile OneShotLatch blocker;
 	protected volatile OneShotLatch waiter;
 
-	BlockingCheckpointOutputStream lastCreatedStream;
+	protected final Set<BlockingCheckpointOutputStream> allCreatedStreams;
 
-	public BlockingCheckpointOutputStream getLastCreatedStream() {
-		return lastCreatedStream;
+	public Set<BlockingCheckpointOutputStream> getAllCreatedStreams() {
+		return allCreatedStreams;
 	}
 
 	public BlockerCheckpointStreamFactory(int maxSize) {
 		this.maxSize = maxSize;
+		this.allCreatedStreams = new HashSet<>();
 	}
 
 	public void setAfterNumberInvocations(int afterNumberInvocations) {
@@ -70,14 +73,17 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
 	}
 
 	@Override
-	public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
+	public CheckpointStateOutputStream createCheckpointStateOutputStream(
+		CheckpointedStateScope scope) throws IOException {
 
-		this.lastCreatedStream = new BlockingCheckpointOutputStream(
+		BlockingCheckpointOutputStream blockingStream = new BlockingCheckpointOutputStream(
 			new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
 			waiter,
 			blocker,
 			afterNumberInvocations);
 
-		return lastCreatedStream;
+		allCreatedStreams.add(blockingStream);
+
+		return blockingStream;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34254886/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 9958577..fe1a625 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -85,6 +85,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RunnableFuture;
@@ -327,14 +328,21 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 		task.cancel();
 		blockerCheckpointStreamFactory.getBlockerLatch().trigger();
 		testHarness.endInput();
-		Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
+
+		ExecutorService threadPool = task.getAsyncOperationsThreadPool();
+		threadPool.shutdown();
+		Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
+
+		Set<BlockingCheckpointOutputStream> createdStreams = blockerCheckpointStreamFactory.getAllCreatedStreams();
+
+		for (BlockingCheckpointOutputStream stream : createdStreams) {
+			Assert.assertTrue(
+				"Not all of the " + createdStreams.size() + " created streams have been closed.",
+				stream.isClosed());
+		}
 
 		try {
-			ExecutorService threadPool = task.getAsyncOperationsThreadPool();
-			threadPool.shutdown();
-			Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
 			testHarness.waitForTaskCompletion();
-
 			fail("Operation completed. Cancel failed.");
 		} catch (Exception expected) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34254886/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9466bc3..f54d0e3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -341,7 +342,11 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 			assertNotNull(keyedStateHandle);
 			assertTrue(keyedStateHandle.getStateSize() > 0);
 			assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
-			assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+
+			for (BlockingCheckpointOutputStream stream : testStreamFactory.getAllCreatedStreams()) {
+				assertTrue(stream.isClosed());
+			}
+
 			asyncSnapshotThread.join();
 			verifyRocksObjectsReleased();
 		} finally {
@@ -363,7 +368,11 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 			runStateUpdates();
 			snapshot.cancel(true);
 			blocker.trigger(); // allow checkpointing to start writing
-			assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+
+			for (BlockingCheckpointOutputStream stream : testStreamFactory.getAllCreatedStreams()) {
+				assertTrue(stream.isClosed());
+			}
+
 			waiter.await(); // wait for snapshot stream writing to run
 			try {
 				snapshot.get();