You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/10/20 04:27:12 UTC

flink git commit: [FLINK-5372] [tests] Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

Repository: flink
Updated Branches:
  refs/heads/master 479be9d88 -> dbf4c865f


[FLINK-5372] [tests] Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbf4c865
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbf4c865
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbf4c865

Branch: refs/heads/master
Commit: dbf4c865f712bcac3bf039ed5b10b9ae2e5809ce
Parents: 479be9d
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Oct 19 13:10:21 2017 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Oct 20 12:22:54 2017 +0800

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         | 151 ++++++++-----------
 .../state/RocksDBStateBackendConfigTest.java    |  45 ++++--
 .../apache/flink/util/ResourceGuardTest.java    |   2 +-
 .../util/BlockerCheckpointStreamFactory.java    |  16 +-
 4 files changed, 107 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbf4c865/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 1f1b65a..b519b1a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -45,19 +46,20 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -69,7 +71,6 @@ import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RunnableFuture;
@@ -92,7 +93,7 @@ import static org.mockito.Mockito.verify;
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
 @SuppressWarnings("serial")
-public class RocksDBAsyncSnapshotTest {
+public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 	/**
 	 * This ensures that asynchronous state handles are actually materialized asynchronously.
@@ -209,7 +210,6 @@ public class RocksDBAsyncSnapshotTest {
 	 * @throws Exception
 	 */
 	@Test
-	@Ignore
 	public void testCancelFullyAsyncCheckpoints() throws Exception {
 		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
 
@@ -229,6 +229,29 @@ public class RocksDBAsyncSnapshotTest {
 
 		BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
 
+		BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
+			new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {
+
+			int count = 1;
+
+			@Override
+			public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(
+				long checkpointID,
+				long timestamp) throws Exception {
+
+				// we skip the first created stream, because it is used to checkpoint the timer service, which is
+				// currently not asynchronous.
+				if (count > 0) {
+					--count;
+					return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize);
+				} else {
+					return super.createCheckpointStateOutputStream(checkpointID, timestamp);
+				}
+			}
+		};
+
+		BlockingStreamMemoryStateBackend.blockerCheckpointStreamFactory = blockerCheckpointStreamFactory;
+
 		RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend);
 		backend.setDbStoragePath(dbDir.getAbsolutePath());
 
@@ -244,8 +267,8 @@ public class RocksDBAsyncSnapshotTest {
 				new MockInputSplitProvider(),
 				testHarness.bufferSize);
 
-		BlockingStreamMemoryStateBackend.waitFirstWriteLatch = new OneShotLatch();
-		BlockingStreamMemoryStateBackend.unblockCancelLatch = new OneShotLatch();
+		blockerCheckpointStreamFactory.setBlockerLatch(new OneShotLatch());
+		blockerCheckpointStreamFactory.setWaiterLatch(new OneShotLatch());
 
 		testHarness.invoke(mockEnv);
 
@@ -259,39 +282,31 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
+		task.triggerCheckpoint(
+			new CheckpointMetaData(42, 17),
+			CheckpointOptions.forFullCheckpoint());
+
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-		BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
+		blockerCheckpointStreamFactory.getWaiterLatch().await();
 		task.cancel();
-		BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger();
+		blockerCheckpointStreamFactory.getBlockerLatch().trigger();
 		testHarness.endInput();
-		try {
+		Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
 
+		try {
 			ExecutorService threadPool = task.getAsyncOperationsThreadPool();
 			threadPool.shutdown();
 			Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
 			testHarness.waitForTaskCompletion();
 
-			if (mockEnv.wasFailedExternally()) {
-				throw new AsynchronousException(new InterruptedException("Exception was thrown as expected."));
-			}
 			fail("Operation completed. Cancel failed.");
 		} catch (Exception expected) {
-			AsynchronousException asynchronousException = null;
 
-			if (expected instanceof AsynchronousException) {
-				asynchronousException = (AsynchronousException) expected;
-			} else if (expected.getCause() instanceof AsynchronousException) {
-				asynchronousException = (AsynchronousException) expected.getCause();
-			} else {
+			Throwable cause = expected.getCause();
+
+			if (!(cause instanceof CancelTaskException)) {
 				fail("Unexpected exception: " + expected);
 			}
-
-			// we expect the exception from canceling snapshots
-			Throwable innerCause = asynchronousException.getCause();
-			Assert.assertTrue("Unexpected inner cause: " + innerCause,
-					innerCause instanceof CancellationException //future canceled
-							|| innerCause instanceof InterruptedException); //thread interrupted
 		}
 	}
 
@@ -329,25 +344,31 @@ public class RocksDBAsyncSnapshotTest {
 			new KeyGroupRange(0, 0),
 			null);
 
-		keyedStateBackend.restore(null);
+		try {
 
-		// register a state so that the state backend has to checkpoint something
-		keyedStateBackend.getPartitionedState(
-			"namespace",
-			StringSerializer.INSTANCE,
-			new ValueStateDescriptor<>("foobar", String.class));
+			keyedStateBackend.restore(null);
 
-		RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
-			checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
+			// register a state so that the state backend has to checkpoint something
+			keyedStateBackend.getPartitionedState(
+				"namespace",
+				StringSerializer.INSTANCE,
+				new ValueStateDescriptor<>("foobar", String.class));
 
-		try {
-			FutureUtil.runIfNotDoneAndGet(snapshotFuture);
-			fail("Expected an exception to be thrown here.");
-		} catch (ExecutionException e) {
-			Assert.assertEquals(testException, e.getCause());
-		}
+			RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
+				checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
 
-		verify(outputStream).close();
+			try {
+				FutureUtil.runIfNotDoneAndGet(snapshotFuture);
+				fail("Expected an exception to be thrown here.");
+			} catch (ExecutionException e) {
+				Assert.assertEquals(testException, e.getCause());
+			}
+
+			verify(outputStream).close();
+		} finally {
+			IOUtils.closeQuietly(keyedStateBackend);
+			keyedStateBackend.dispose();
+		}
 	}
 
 	@Test
@@ -379,55 +400,11 @@ public class RocksDBAsyncSnapshotTest {
 	 */
 	static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {
 
-		public static volatile OneShotLatch waitFirstWriteLatch = null;
-
-		public static volatile OneShotLatch unblockCancelLatch = null;
-
-		private volatile boolean closed = false;
+		public static volatile BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = null;
 
 		@Override
 		public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
-			return new MemCheckpointStreamFactory(4 * 1024 * 1024) {
-				@Override
-				public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
-
-					return new MemoryCheckpointOutputStream(4 * 1024 * 1024) {
-						@Override
-						public void write(int b) throws IOException {
-							waitFirstWriteLatch.trigger();
-							try {
-								unblockCancelLatch.await();
-							} catch (InterruptedException e) {
-								Thread.currentThread().interrupt();
-							}
-							if (closed) {
-								throw new IOException("Stream closed.");
-							}
-							super.write(b);
-						}
-
-						@Override
-						public void write(byte[] b, int off, int len) throws IOException {
-							waitFirstWriteLatch.trigger();
-							try {
-								unblockCancelLatch.await();
-							} catch (InterruptedException e) {
-								Thread.currentThread().interrupt();
-							}
-							if (closed) {
-								throw new IOException("Stream closed.");
-							}
-							super.write(b, off, len);
-						}
-
-						@Override
-						public void close() {
-							closed = true;
-							super.close();
-						}
-					};
-				}
-			};
+			return blockerCheckpointStreamFactory;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbf4c865/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8ec29e2..853d80f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -103,12 +105,17 @@ public class RocksDBStateBackendConfigTest {
 						new KeyGroupRange(0, 0),
 						env.getTaskKvStateRegistry());
 
-		File instanceBasePath = keyedBackend.getInstanceBasePath();
-		assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
+		try {
+			File instanceBasePath = keyedBackend.getInstanceBasePath();
+			assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
 
-		//noinspection NullArgumentToVariableArgMethod
-		rocksDbBackend.setDbStoragePaths(null);
-		assertNull(rocksDbBackend.getDbStoragePaths());
+			//noinspection NullArgumentToVariableArgMethod
+			rocksDbBackend.setDbStoragePaths(null);
+			assertNull(rocksDbBackend.getDbStoragePaths());
+		} finally {
+			IOUtils.closeQuietly(keyedBackend);
+			keyedBackend.dispose();
+		}
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -158,8 +165,13 @@ public class RocksDBStateBackendConfigTest {
 						new KeyGroupRange(0, 0),
 						env.getTaskKvStateRegistry());
 
-		File instanceBasePath = keyedBackend.getInstanceBasePath();
-		assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
+		try {
+			File instanceBasePath = keyedBackend.getInstanceBasePath();
+			assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
+		} finally {
+			IOUtils.closeQuietly(keyedBackend);
+			keyedBackend.dispose();
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -225,14 +237,17 @@ public class RocksDBStateBackendConfigTest {
 
 			try {
 				Environment env = getMockEnvironment();
-				rocksDbBackend.createKeyedStateBackend(
-						env,
-						env.getJobID(),
-						"foobar",
-						IntSerializer.INSTANCE,
-						1,
-						new KeyGroupRange(0, 0),
-						new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+				AbstractKeyedStateBackend<Integer> keyedStateBackend = rocksDbBackend.createKeyedStateBackend(
+					env,
+					env.getJobID(),
+					"foobar",
+					IntSerializer.INSTANCE,
+					1,
+					new KeyGroupRange(0, 0),
+					new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+
+				IOUtils.closeQuietly(keyedStateBackend);
+				keyedStateBackend.dispose();
 			}
 			catch (Exception e) {
 				e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/dbf4c865/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
index a030a81..98aae4d 100644
--- a/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java
@@ -24,7 +24,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class ResourceGuardTest {
+public class ResourceGuardTest extends TestLogger {
 
 	@Test
 	public void testClose() {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbf4c865/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 98e654f..2091e00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -33,10 +33,10 @@ import java.io.IOException;
 @Internal
 public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
 
-	private final int maxSize;
-	private volatile int afterNumberInvocations;
-	private volatile OneShotLatch blocker;
-	private volatile OneShotLatch waiter;
+	protected final int maxSize;
+	protected volatile int afterNumberInvocations;
+	protected volatile OneShotLatch blocker;
+	protected volatile OneShotLatch waiter;
 
 	MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
 
@@ -60,6 +60,14 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
 		this.waiter = latch;
 	}
 
+	public OneShotLatch getBlockerLatch() {
+		return blocker;
+	}
+
+	public OneShotLatch getWaiterLatch() {
+		return waiter;
+	}
+
 	@Override
 	public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
 		this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {