You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/07 20:03:36 UTC

[2/2] flink git commit: [FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest#testCancelRunningSnapshot failing sporadically

[FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest#testCancelRunningSnapshot failing sporadically


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8ffacb1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8ffacb1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8ffacb1

Branch: refs/heads/master
Commit: b8ffacb1b88690090120cdb2341c68b53dc167ba
Parents: 63c04a5
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun May 7 15:09:05 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 7 22:00:43 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |   6 +-
 .../state/RocksDBStateBackendTest.java          |   4 +-
 .../memory/MemCheckpointStreamFactory.java      |  22 ++--
 .../BlockerCheckpointStreamFactory.java         | 112 -------------------
 .../runtime/state/OperatorStateBackendTest.java |  12 +-
 .../util/BlockerCheckpointStreamFactory.java    |  48 +++++---
 6 files changed, 60 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 079ea13..3cb21ac 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -799,7 +799,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			try {
 				outputStream = checkpointStreamFactory
 					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
-				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+				closeableRegistry.registerClosable(outputStream);
 
 				KeyedBackendSerializationProxy serializationProxy =
 					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots);
@@ -807,14 +807,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				serializationProxy.write(out);
 
-				stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+				closeableRegistry.unregisterClosable(outputStream);
 				StreamStateHandle result = outputStream.closeAndGetHandle();
 				outputStream = null;
 
 				return result;
 			} finally {
 				if (outputStream != null) {
-					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+					closeableRegistry.unregisterClosable(outputStream);
 					outputStream.close();
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 99b71c5..9340455 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -308,8 +308,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		waiter.await(); // wait for snapshot to run
 		waiter.reset();
 		runStateUpdates();
-		blocker.trigger(); // allow checkpointing to start writing
 		snapshot.cancel(true);
+		blocker.trigger(); // allow checkpointing to start writing
 		assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
 		waiter.await(); // wait for snapshot stream writing to run
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 9b2b46f..3920ce8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays.
@@ -78,12 +79,13 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 
 		private final int maxSize;
 
-		private boolean closed;
+		private AtomicBoolean closed;
 
 		boolean isEmpty = true;
 
 		public MemoryCheckpointOutputStream(int maxSize) {
 			this.maxSize = maxSize;
+			this.closed = new AtomicBoolean(false);
 		}
 
 		@Override
@@ -110,8 +112,9 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 
 		@Override
 		public void close() {
-			closed = true;
-			os.reset();
+			if (closed.compareAndSet(false, true)) {
+				closeInternal();
+			}
 		}
 
 		@Override
@@ -128,7 +131,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 		}
 
 		public boolean isClosed() {
-			return closed;
+			return closed.get();
 		}
 
 		/**
@@ -137,15 +140,18 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 		 * @throws IOException Thrown if the size of the data exceeds the maximal
 		 */
 		public byte[] closeAndGetBytes() throws IOException {
-			if (!closed) {
+			if (closed.compareAndSet(false, true)) {
 				checkSize(os.size(), maxSize);
 				byte[] bytes = os.toByteArray();
-				close();
+				closeInternal();
 				return bytes;
-			}
-			else {
+			} else {
 				throw new IOException("stream has already been closed");
 			}
 		}
+
+		private void closeInternal() {
+			os.reset();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
deleted file mode 100644
index 6f892e2..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
-
-import java.io.IOException;
-
-/**
- * A {@link CheckpointStreamFactory} for tests that creates streams that block on a latch to test concurrency in
- * checkpointing.
- */
-public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
-
-	private final int maxSize;
-	private int afterNumberInvocations;
-	private OneShotLatch blocker;
-	private OneShotLatch waiter;
-
-	MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
-
-	public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
-		return lastCreatedStream;
-	}
-
-	public BlockerCheckpointStreamFactory(int maxSize) {
-		this.maxSize = maxSize;
-	}
-
-	public void setAfterNumberInvocations(int afterNumberInvocations) {
-		this.afterNumberInvocations = afterNumberInvocations;
-	}
-
-	public void setBlockerLatch(OneShotLatch latch) {
-		this.blocker = latch;
-	}
-
-	public void setWaiterLatch(OneShotLatch latch) {
-		this.waiter = latch;
-	}
-
-	@Override
-	public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
-		waiter.trigger();
-		this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
-			private int afterNInvocations = afterNumberInvocations;
-			private final OneShotLatch streamBlocker = blocker;
-			private final OneShotLatch streamWaiter = waiter;
-
-			@Override
-			public void write(int b) throws IOException {
-
-				if (afterNInvocations > 0) {
-					--afterNInvocations;
-				}
-
-				if (0 == afterNInvocations && null != streamBlocker) {
-					try {
-						streamBlocker.await();
-					} catch (InterruptedException ignored) {
-					}
-				}
-				try {
-					super.write(b);
-				} catch (IOException ex) {
-					if (null != streamWaiter) {
-						streamWaiter.trigger();
-					}
-					throw ex;
-				}
-
-				if (0 == afterNInvocations && null != streamWaiter) {
-					streamWaiter.trigger();
-				}
-			}
-
-			@Override
-			public void close() {
-				super.close();
-				if (null != streamWaiter) {
-					streamWaiter.trigger();
-				}
-			}
-		};
-
-		return lastCreatedStream;
-	}
-
-	@Override
-	public void close() throws Exception {
-
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 50ca159..85b9eaf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,21 +21,21 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.File;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.CancellationException;
@@ -477,18 +477,20 @@ public class OperatorStateBackendTest {
 
 		executorService.submit(runnableFuture);
 
-		// wait until the async checkpoint is in the write code, then continue
+		// wait until the async checkpoint is in the stream's write code, then continue
 		waiterLatch.await();
 
+		// cancel the future, which should close the underlying stream
 		runnableFuture.cancel(true);
+		Assert.assertTrue(streamFactory.getLastCreatedStream().isClosed());
 
+		// we allow the stream under test to proceed
 		blockerLatch.trigger();
 
 		try {
 			runnableFuture.get(60, TimeUnit.SECONDS);
 			Assert.fail();
 		} catch (CancellationException ignore) {
-
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 1e31490..98e654f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -71,31 +71,28 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
 			@Override
 			public void write(int b) throws IOException {
 
-				if (null != waiter) {
-					waiter.trigger();
-				}
+				unblockWaiter();
 
 				if (afterNInvocations > 0) {
 					--afterNInvocations;
+				} else {
+					awaitBlocker();
 				}
 
-				if (0 == afterNInvocations && null != streamBlocker) {
-					try {
-						streamBlocker.await();
-					} catch (InterruptedException ignored) {
-					}
-				}
 				try {
 					super.write(b);
 				} catch (IOException ex) {
-					if (null != streamWaiter) {
-						streamWaiter.trigger();
-					}
+					unblockWaiter();
 					throw ex;
 				}
 
-				if (0 == afterNInvocations && null != streamWaiter) {
-					streamWaiter.trigger();
+				if (0 == afterNInvocations) {
+					unblockWaiter();
+				}
+
+				// We also check for close here, in case the underlying stream does not do this
+				if (isClosed()) {
+					throw new IOException("Stream closed.");
 				}
 			}
 
@@ -110,10 +107,33 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
 			@Override
 			public void close() {
 				super.close();
+				// trigger all the latches, essentially all blocking ops on the stream should resume after close.
+				unblockAll();
+			}
+
+			private void unblockWaiter() {
 				if (null != streamWaiter) {
 					streamWaiter.trigger();
 				}
 			}
+
+			private void awaitBlocker() {
+				if (null != streamBlocker) {
+					try {
+						streamBlocker.await();
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+
+			private void unblockAll() {
+				if (null != streamWaiter) {
+					streamWaiter.trigger();
+				}
+				if (null != streamBlocker) {
+					streamBlocker.trigger();
+				}
+			}
 		};
 
 		return lastCreatedStream;