You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/07 20:03:36 UTC
[2/2] flink git commit: [FLINK-6471] [checkpoint] Fix
RocksDBStateBackendTest#testCancelRunningSnapshot failing sporadically
[FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest#testCancelRunningSnapshot failing sporadically
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8ffacb1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8ffacb1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8ffacb1
Branch: refs/heads/master
Commit: b8ffacb1b88690090120cdb2341c68b53dc167ba
Parents: 63c04a5
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun May 7 15:09:05 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 7 22:00:43 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 6 +-
.../state/RocksDBStateBackendTest.java | 4 +-
.../memory/MemCheckpointStreamFactory.java | 22 ++--
.../BlockerCheckpointStreamFactory.java | 112 -------------------
.../runtime/state/OperatorStateBackendTest.java | 12 +-
.../util/BlockerCheckpointStreamFactory.java | 48 +++++---
6 files changed, 60 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 079ea13..3cb21ac 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -799,7 +799,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
- stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+ closeableRegistry.registerClosable(outputStream);
KeyedBackendSerializationProxy serializationProxy =
new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots);
@@ -807,14 +807,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.write(out);
- stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ closeableRegistry.unregisterClosable(outputStream);
StreamStateHandle result = outputStream.closeAndGetHandle();
outputStream = null;
return result;
} finally {
if (outputStream != null) {
- stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ closeableRegistry.unregisterClosable(outputStream);
outputStream.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 99b71c5..9340455 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -308,8 +308,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
waiter.await(); // wait for snapshot to run
waiter.reset();
runStateUpdates();
- blocker.trigger(); // allow checkpointing to start writing
snapshot.cancel(true);
+ blocker.trigger(); // allow checkpointing to start writing
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
waiter.await(); // wait for snapshot stream writing to run
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 9b2b46f..3920ce8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import java.io.IOException;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays.
@@ -78,12 +79,13 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
private final int maxSize;
- private boolean closed;
+ private AtomicBoolean closed;
boolean isEmpty = true;
public MemoryCheckpointOutputStream(int maxSize) {
this.maxSize = maxSize;
+ this.closed = new AtomicBoolean(false);
}
@Override
@@ -110,8 +112,9 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
@Override
public void close() {
- closed = true;
- os.reset();
+ if (closed.compareAndSet(false, true)) {
+ closeInternal();
+ }
}
@Override
@@ -128,7 +131,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
}
public boolean isClosed() {
- return closed;
+ return closed.get();
}
/**
@@ -137,15 +140,18 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
* @throws IOException Thrown if the size of the data exceeds the maximal
*/
public byte[] closeAndGetBytes() throws IOException {
- if (!closed) {
+ if (closed.compareAndSet(false, true)) {
checkSize(os.size(), maxSize);
byte[] bytes = os.toByteArray();
- close();
+ closeInternal();
return bytes;
- }
- else {
+ } else {
throw new IOException("stream has already been closed");
}
}
+
+ private void closeInternal() {
+ os.reset();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
deleted file mode 100644
index 6f892e2..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
-
-import java.io.IOException;
-
-/**
- * A {@link CheckpointStreamFactory} for tests that creates streams that block on a latch to test concurrency in
- * checkpointing.
- */
-public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
-
- private final int maxSize;
- private int afterNumberInvocations;
- private OneShotLatch blocker;
- private OneShotLatch waiter;
-
- MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
-
- public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
- return lastCreatedStream;
- }
-
- public BlockerCheckpointStreamFactory(int maxSize) {
- this.maxSize = maxSize;
- }
-
- public void setAfterNumberInvocations(int afterNumberInvocations) {
- this.afterNumberInvocations = afterNumberInvocations;
- }
-
- public void setBlockerLatch(OneShotLatch latch) {
- this.blocker = latch;
- }
-
- public void setWaiterLatch(OneShotLatch latch) {
- this.waiter = latch;
- }
-
- @Override
- public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
- waiter.trigger();
- this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
- private int afterNInvocations = afterNumberInvocations;
- private final OneShotLatch streamBlocker = blocker;
- private final OneShotLatch streamWaiter = waiter;
-
- @Override
- public void write(int b) throws IOException {
-
- if (afterNInvocations > 0) {
- --afterNInvocations;
- }
-
- if (0 == afterNInvocations && null != streamBlocker) {
- try {
- streamBlocker.await();
- } catch (InterruptedException ignored) {
- }
- }
- try {
- super.write(b);
- } catch (IOException ex) {
- if (null != streamWaiter) {
- streamWaiter.trigger();
- }
- throw ex;
- }
-
- if (0 == afterNInvocations && null != streamWaiter) {
- streamWaiter.trigger();
- }
- }
-
- @Override
- public void close() {
- super.close();
- if (null != streamWaiter) {
- streamWaiter.trigger();
- }
- }
- };
-
- return lastCreatedStream;
- }
-
- @Override
- public void close() throws Exception {
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 50ca159..85b9eaf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,21 +21,21 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.util.FutureUtil;
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.io.File;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CancellationException;
@@ -477,18 +477,20 @@ public class OperatorStateBackendTest {
executorService.submit(runnableFuture);
- // wait until the async checkpoint is in the write code, then continue
+ // wait until the async checkpoint is in the stream's write code, then continue
waiterLatch.await();
+ // cancel the future, which should close the underlying stream
runnableFuture.cancel(true);
+ Assert.assertTrue(streamFactory.getLastCreatedStream().isClosed());
+ // we allow the stream under test to proceed
blockerLatch.trigger();
try {
runnableFuture.get(60, TimeUnit.SECONDS);
Assert.fail();
} catch (CancellationException ignore) {
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 1e31490..98e654f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -71,31 +71,28 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
@Override
public void write(int b) throws IOException {
- if (null != waiter) {
- waiter.trigger();
- }
+ unblockWaiter();
if (afterNInvocations > 0) {
--afterNInvocations;
+ } else {
+ awaitBlocker();
}
- if (0 == afterNInvocations && null != streamBlocker) {
- try {
- streamBlocker.await();
- } catch (InterruptedException ignored) {
- }
- }
try {
super.write(b);
} catch (IOException ex) {
- if (null != streamWaiter) {
- streamWaiter.trigger();
- }
+ unblockWaiter();
throw ex;
}
- if (0 == afterNInvocations && null != streamWaiter) {
- streamWaiter.trigger();
+ if (0 == afterNInvocations) {
+ unblockWaiter();
+ }
+
+ // We also check for close here, in case the underlying stream does not do this
+ if (isClosed()) {
+ throw new IOException("Stream closed.");
}
}
@@ -110,10 +107,33 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
@Override
public void close() {
super.close();
+ // trigger all the latches, essentially all blocking ops on the stream should resume after close.
+ unblockAll();
+ }
+
+ private void unblockWaiter() {
if (null != streamWaiter) {
streamWaiter.trigger();
}
}
+
+ private void awaitBlocker() {
+ if (null != streamBlocker) {
+ try {
+ streamBlocker.await();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ private void unblockAll() {
+ if (null != streamWaiter) {
+ streamWaiter.trigger();
+ }
+ if (null != streamBlocker) {
+ streamBlocker.trigger();
+ }
+ }
};
return lastCreatedStream;