You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/10/20 04:27:12 UTC
flink git commit: [FLINK-5372] [tests] Fix
RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
Repository: flink
Updated Branches:
refs/heads/master 479be9d88 -> dbf4c865f
[FLINK-5372] [tests] Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbf4c865
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbf4c865
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbf4c865
Branch: refs/heads/master
Commit: dbf4c865f712bcac3bf039ed5b10b9ae2e5809ce
Parents: 479be9d
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Oct 19 13:10:21 2017 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Oct 20 12:22:54 2017 +0800
----------------------------------------------------------------------
.../state/RocksDBAsyncSnapshotTest.java | 151 ++++++++-----------
.../state/RocksDBStateBackendConfigTest.java | 45 ++++--
.../apache/flink/util/ResourceGuardTest.java | 2 +-
.../util/BlockerCheckpointStreamFactory.java | 16 +-
4 files changed, 107 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dbf4c865/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 1f1b65a..b519b1a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -45,19 +46,20 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -69,7 +71,6 @@ import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RunnableFuture;
@@ -92,7 +93,7 @@ import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
@SuppressWarnings("serial")
-public class RocksDBAsyncSnapshotTest {
+public class RocksDBAsyncSnapshotTest extends TestLogger {
/**
* This ensures that asynchronous state handles are actually materialized asynchronously.
@@ -209,7 +210,6 @@ public class RocksDBAsyncSnapshotTest {
* @throws Exception
*/
@Test
- @Ignore
public void testCancelFullyAsyncCheckpoints() throws Exception {
final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
@@ -229,6 +229,29 @@ public class RocksDBAsyncSnapshotTest {
BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
+ BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
+ new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {
+
+ int count = 1;
+
+ @Override
+ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(
+ long checkpointID,
+ long timestamp) throws Exception {
+
+ // we skip the first created stream, because it is used to checkpoint the timer service, which is
+ // currently not asynchronous.
+ if (count > 0) {
+ --count;
+ return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize);
+ } else {
+ return super.createCheckpointStateOutputStream(checkpointID, timestamp);
+ }
+ }
+ };
+
+ BlockingStreamMemoryStateBackend.blockerCheckpointStreamFactory = blockerCheckpointStreamFactory;
+
RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend);
backend.setDbStoragePath(dbDir.getAbsolutePath());
@@ -244,8 +267,8 @@ public class RocksDBAsyncSnapshotTest {
new MockInputSplitProvider(),
testHarness.bufferSize);
- BlockingStreamMemoryStateBackend.waitFirstWriteLatch = new OneShotLatch();
- BlockingStreamMemoryStateBackend.unblockCancelLatch = new OneShotLatch();
+ blockerCheckpointStreamFactory.setBlockerLatch(new OneShotLatch());
+ blockerCheckpointStreamFactory.setWaiterLatch(new OneShotLatch());
testHarness.invoke(mockEnv);
@@ -259,39 +282,31 @@ public class RocksDBAsyncSnapshotTest {
}
}
- task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
+ task.triggerCheckpoint(
+ new CheckpointMetaData(42, 17),
+ CheckpointOptions.forFullCheckpoint());
+
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
- BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
+ blockerCheckpointStreamFactory.getWaiterLatch().await();
task.cancel();
- BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger();
+ blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
- try {
+ Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
+ try {
ExecutorService threadPool = task.getAsyncOperationsThreadPool();
threadPool.shutdown();
Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
testHarness.waitForTaskCompletion();
- if (mockEnv.wasFailedExternally()) {
- throw new AsynchronousException(new InterruptedException("Exception was thrown as expected."));
- }
fail("Operation completed. Cancel failed.");
} catch (Exception expected) {
- AsynchronousException asynchronousException = null;
- if (expected instanceof AsynchronousException) {
- asynchronousException = (AsynchronousException) expected;
- } else if (expected.getCause() instanceof AsynchronousException) {
- asynchronousException = (AsynchronousException) expected.getCause();
- } else {
+ Throwable cause = expected.getCause();
+
+ if (!(cause instanceof CancelTaskException)) {
fail("Unexpected exception: " + expected);
}
-
- // we expect the exception from canceling snapshots
- Throwable innerCause = asynchronousException.getCause();
- Assert.assertTrue("Unexpected inner cause: " + innerCause,
- innerCause instanceof CancellationException //future canceled
- || innerCause instanceof InterruptedException); //thread interrupted
}
}
@@ -329,25 +344,31 @@ public class RocksDBAsyncSnapshotTest {
new KeyGroupRange(0, 0),
null);
- keyedStateBackend.restore(null);
+ try {
- // register a state so that the state backend has to checkpoint something
- keyedStateBackend.getPartitionedState(
- "namespace",
- StringSerializer.INSTANCE,
- new ValueStateDescriptor<>("foobar", String.class));
+ keyedStateBackend.restore(null);
- RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
- checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
+ // register a state so that the state backend has to checkpoint something
+ keyedStateBackend.getPartitionedState(
+ "namespace",
+ StringSerializer.INSTANCE,
+ new ValueStateDescriptor<>("foobar", String.class));
- try {
- FutureUtil.runIfNotDoneAndGet(snapshotFuture);
- fail("Expected an exception to be thrown here.");
- } catch (ExecutionException e) {
- Assert.assertEquals(testException, e.getCause());
- }
+ RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
+ checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
- verify(outputStream).close();
+ try {
+ FutureUtil.runIfNotDoneAndGet(snapshotFuture);
+ fail("Expected an exception to be thrown here.");
+ } catch (ExecutionException e) {
+ Assert.assertEquals(testException, e.getCause());
+ }
+
+ verify(outputStream).close();
+ } finally {
+ IOUtils.closeQuietly(keyedStateBackend);
+ keyedStateBackend.dispose();
+ }
}
@Test
@@ -379,55 +400,11 @@ public class RocksDBAsyncSnapshotTest {
*/
static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {
- public static volatile OneShotLatch waitFirstWriteLatch = null;
-
- public static volatile OneShotLatch unblockCancelLatch = null;
-
- private volatile boolean closed = false;
+ public static volatile BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = null;
@Override
public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
- return new MemCheckpointStreamFactory(4 * 1024 * 1024) {
- @Override
- public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
-
- return new MemoryCheckpointOutputStream(4 * 1024 * 1024) {
- @Override
- public void write(int b) throws IOException {
- waitFirstWriteLatch.trigger();
- try {
- unblockCancelLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (closed) {
- throw new IOException("Stream closed.");
- }
- super.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- waitFirstWriteLatch.trigger();
- try {
- unblockCancelLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (closed) {
- throw new IOException("Stream closed.");
- }
- super.write(b, off, len);
- }
-
- @Override
- public void close() {
- closed = true;
- super.close();
- }
- };
- }
- };
+ return blockerCheckpointStreamFactory;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbf4c865/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8ec29e2..853d80f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -103,12 +105,17 @@ public class RocksDBStateBackendConfigTest {
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());
- File instanceBasePath = keyedBackend.getInstanceBasePath();
- assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
+ try {
+ File instanceBasePath = keyedBackend.getInstanceBasePath();
+ assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
- //noinspection NullArgumentToVariableArgMethod
- rocksDbBackend.setDbStoragePaths(null);
- assertNull(rocksDbBackend.getDbStoragePaths());
+ //noinspection NullArgumentToVariableArgMethod
+ rocksDbBackend.setDbStoragePaths(null);
+ assertNull(rocksDbBackend.getDbStoragePaths());
+ } finally {
+ IOUtils.closeQuietly(keyedBackend);
+ keyedBackend.dispose();
+ }
}
@Test(expected = IllegalArgumentException.class)
@@ -158,8 +165,13 @@ public class RocksDBStateBackendConfigTest {
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());
- File instanceBasePath = keyedBackend.getInstanceBasePath();
- assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
+ try {
+ File instanceBasePath = keyedBackend.getInstanceBasePath();
+ assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
+ } finally {
+ IOUtils.closeQuietly(keyedBackend);
+ keyedBackend.dispose();
+ }
}
// ------------------------------------------------------------------------
@@ -225,14 +237,17 @@ public class RocksDBStateBackendConfigTest {
try {
Environment env = getMockEnvironment();
- rocksDbBackend.createKeyedStateBackend(
- env,
- env.getJobID(),
- "foobar",
- IntSerializer.INSTANCE,
- 1,
- new KeyGroupRange(0, 0),
- new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+ AbstractKeyedStateBackend<Integer> keyedStateBackend = rocksDbBackend.createKeyedStateBackend(
+ env,
+ env.getJobID(),
+ "foobar",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+
+ IOUtils.closeQuietly(keyedStateBackend);
+ keyedStateBackend.dispose();
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/dbf4c865/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
index a030a81..98aae4d 100644
--- a/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
@@ -24,7 +24,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
-public class ResourceGuardTest {
+public class ResourceGuardTest extends TestLogger {
@Test
public void testClose() {
http://git-wip-us.apache.org/repos/asf/flink/blob/dbf4c865/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 98e654f..2091e00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -33,10 +33,10 @@ import java.io.IOException;
@Internal
public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
- private final int maxSize;
- private volatile int afterNumberInvocations;
- private volatile OneShotLatch blocker;
- private volatile OneShotLatch waiter;
+ protected final int maxSize;
+ protected volatile int afterNumberInvocations;
+ protected volatile OneShotLatch blocker;
+ protected volatile OneShotLatch waiter;
MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
@@ -60,6 +60,14 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
this.waiter = latch;
}
+ public OneShotLatch getBlockerLatch() {
+ return blocker;
+ }
+
+ public OneShotLatch getWaiterLatch() {
+ return waiter;
+ }
+
@Override
public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {