You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/24 09:52:11 UTC
flink git commit: [FLINK-5372][tests] Fix
RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
Repository: flink
Updated Branches:
refs/heads/master 614b1e29e -> 34254886f
[FLINK-5372][tests] Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34254886
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34254886
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34254886
Branch: refs/heads/master
Commit: 34254886fb51c2f25a025077e59a091c2701fb04
Parents: 614b1e2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Apr 23 17:25:46 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Apr 24 11:52:02 2018 +0200
----------------------------------------------------------------------
.../runtime/state/OperatorStateBackendTest.java | 6 +++++-
.../util/BlockerCheckpointStreamFactory.java | 18 ++++++++++++------
.../streaming/state/RocksDBAsyncSnapshotTest.java | 18 +++++++++++++-----
.../streaming/state/RocksDBStateBackendTest.java | 13 +++++++++++--
4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/34254886/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index d6d4af7..16e5188 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableL
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
@@ -834,7 +835,10 @@ public class OperatorStateBackendTest {
// cancel the future, which should close the underlying stream
runnableFuture.cancel(true);
- Assert.assertTrue(streamFactory.getLastCreatedStream().isClosed());
+
+ for (BlockingCheckpointOutputStream stream : streamFactory.getAllCreatedStreams()) {
+ Assert.assertTrue(stream.isClosed());
+ }
// we allow the stream under test to proceed
blockerLatch.trigger();
http://git-wip-us.apache.org/repos/asf/flink/blob/34254886/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index ce41b10..b1bb0ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
/**
* {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO.
@@ -39,14 +41,15 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
protected volatile OneShotLatch blocker;
protected volatile OneShotLatch waiter;
- BlockingCheckpointOutputStream lastCreatedStream;
+ protected final Set<BlockingCheckpointOutputStream> allCreatedStreams;
- public BlockingCheckpointOutputStream getLastCreatedStream() {
- return lastCreatedStream;
+ public Set<BlockingCheckpointOutputStream> getAllCreatedStreams() {
+ return allCreatedStreams;
}
public BlockerCheckpointStreamFactory(int maxSize) {
this.maxSize = maxSize;
+ this.allCreatedStreams = new HashSet<>();
}
public void setAfterNumberInvocations(int afterNumberInvocations) {
@@ -70,14 +73,17 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
}
@Override
- public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(
+ CheckpointedStateScope scope) throws IOException {
- this.lastCreatedStream = new BlockingCheckpointOutputStream(
+ BlockingCheckpointOutputStream blockingStream = new BlockingCheckpointOutputStream(
new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
waiter,
blocker,
afterNumberInvocations);
- return lastCreatedStream;
+ allCreatedStreams.add(blockingStream);
+
+ return blockingStream;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34254886/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 9958577..fe1a625 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -85,6 +85,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RunnableFuture;
@@ -327,14 +328,21 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
task.cancel();
blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
- Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
+
+ ExecutorService threadPool = task.getAsyncOperationsThreadPool();
+ threadPool.shutdown();
+ Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
+
+ Set<BlockingCheckpointOutputStream> createdStreams = blockerCheckpointStreamFactory.getAllCreatedStreams();
+
+ for (BlockingCheckpointOutputStream stream : createdStreams) {
+ Assert.assertTrue(
+ "Not all of the " + createdStreams.size() + " created streams have been closed.",
+ stream.isClosed());
+ }
try {
- ExecutorService threadPool = task.getAsyncOperationsThreadPool();
- threadPool.shutdown();
- Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
testHarness.waitForTaskCompletion();
-
fail("Operation completed. Cancel failed.");
} catch (Exception expected) {
http://git-wip-us.apache.org/repos/asf/flink/blob/34254886/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9466bc3..f54d0e3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -341,7 +342,11 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
assertNotNull(keyedStateHandle);
assertTrue(keyedStateHandle.getStateSize() > 0);
assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
- assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+
+ for (BlockingCheckpointOutputStream stream : testStreamFactory.getAllCreatedStreams()) {
+ assertTrue(stream.isClosed());
+ }
+
asyncSnapshotThread.join();
verifyRocksObjectsReleased();
} finally {
@@ -363,7 +368,11 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
runStateUpdates();
snapshot.cancel(true);
blocker.trigger(); // allow checkpointing to start writing
- assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+
+ for (BlockingCheckpointOutputStream stream : testStreamFactory.getAllCreatedStreams()) {
+ assertTrue(stream.isClosed());
+ }
+
waiter.await(); // wait for snapshot stream writing to run
try {
snapshot.get();