You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/03 07:57:42 UTC
[2/2] flink git commit: [FLINK-5146] Improved resource cleanup in
RocksDB keyed state backend
[FLINK-5146] Improved resource cleanup in RocksDB keyed state backend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4f802dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4f802dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4f802dd
Branch: refs/heads/master
Commit: e4f802dd502f38b922f668c2813728d5511ca289
Parents: 6e98a93
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sat Nov 12 21:13:28 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Dec 2 18:10:13 2016 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 312 ++++++++++------
.../state/RocksDBStateBackendTest.java | 364 ++++++++++++++++++-
.../io/async/AbstractAsyncIOCallable.java | 2 +-
.../runtime/io/async/AsyncDoneCallback.java | 4 +-
.../async/AsyncStoppableTaskWithCallback.java | 8 +-
.../memory/MemCheckpointStreamFactory.java | 2 +-
.../api/operators/OperatorSnapshotResult.java | 9 +-
.../streaming/runtime/tasks/StreamTask.java | 67 ++--
.../flink/core/testutils/OneShotLatch.java | 9 +
9 files changed, 624 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index e498b34..bc5b17d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -61,7 +61,7 @@ import org.rocksdb.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.GuardedBy;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -103,14 +103,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* asynchronous checkpoints and when disposing the DB. Otherwise, the asynchronous snapshot might try
* iterating over a disposed DB. After aquriring the lock, always first check if (db == null).
*/
- private final SerializableObject dbDisposeLock = new SerializableObject();
+ private final SerializableObject asyncSnapshotLock = new SerializableObject();
/**
* Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
* to store state. The different k/v states that we have don't each have their own RocksDB
* instance. They all write to this instance but to their own column family.
*/
- @GuardedBy("dbDisposeLock")
protected RocksDB db;
/**
@@ -136,7 +135,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
) throws Exception {
super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
-
this.operatorIdentifier = operatorIdentifier;
this.jobId = jobId;
this.columnOptions = columnFamilyOptions;
@@ -206,8 +204,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
}
- RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
- restoreOperation.doRestore(restoreState);
+ try {
+ RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
+ restoreOperation.doRestore(restoreState);
+ } catch (Exception ex) {
+ dispose();
+ throw ex;
+ }
}
/**
@@ -217,23 +220,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public void dispose() {
super.dispose();
- final RocksDB cleanupRockDBReference;
-
- // Acquire the log on dbDisposeLock, so that no ongoing snapshots access the db during cleanup
- synchronized (dbDisposeLock) {
+ // Acquire the lock, so that no ongoing snapshots access the db during cleanup
+ synchronized (asyncSnapshotLock) {
// IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as
// working on the disposed object results in SEGFAULTS. Other code has to check field #db for null
// and access it in a synchronized block that locks on #dbDisposeLock.
- cleanupRockDBReference = db;
- db = null;
- }
+ if (db != null) {
- // Dispose decoupled db
- if (cleanupRockDBReference != null) {
- for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
- column.f0.close();
+ for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
+ column.f0.close();
+ }
+
+ kvStateInformation.clear();
+
+ db.close();
+ db = null;
}
- cleanupRockDBReference.close();
}
try {
@@ -252,10 +254,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
* be called by the same thread.
*
- * @param checkpointId The Id of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
+ * @param checkpointId The Id of the checkpoint.
+ * @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
- *
* @return Future to the state handle of the snapshot data.
* @throws Exception
*/
@@ -267,14 +268,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
long startTime = System.currentTimeMillis();
- if (kvStateInformation.isEmpty()) {
- LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null.");
- return new DoneFuture<>(null);
- }
-
final RocksDBSnapshotOperation snapshotOperation = new RocksDBSnapshotOperation(this, streamFactory);
// hold the db lock while operation on the db to guard us against async db disposal
- synchronized (dbDisposeLock) {
+ synchronized (asyncSnapshotLock) {
+
+ if (kvStateInformation.isEmpty()) {
+ LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
+ " . Returning null.");
+
+ return new DoneFuture<>(null);
+ }
+
if (db != null) {
snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
} else {
@@ -295,18 +299,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Override
public KeyGroupsStateHandle performOperation() throws Exception {
long startTime = System.currentTimeMillis();
- try {
- // hold the db lock while operation on the db to guard us against async db disposal
- synchronized (dbDisposeLock) {
- if (db != null) {
- snapshotOperation.writeDBSnapshot();
- } else {
+ synchronized (asyncSnapshotLock) {
+ try {
+ // hold the db lock while operation on the db to guard us against async db disposal
+ if (db == null) {
throw new IOException("RocksDB closed.");
}
- }
- } finally {
- snapshotOperation.closeCheckpointStream();
+ snapshotOperation.writeDBSnapshot();
+
+ } finally {
+ snapshotOperation.closeCheckpointStream();
+ }
}
LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " +
@@ -315,15 +319,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return snapshotOperation.getSnapshotResultStateHandle();
}
- @Override
- public void done() {
+ private void releaseSnapshotOperationResources(boolean canceled) {
// hold the db lock while operation on the db to guard us against async db disposal
- synchronized (dbDisposeLock) {
- if (db != null) {
- snapshotOperation.releaseDBSnapshot();
- }
+ synchronized (asyncSnapshotLock) {
+ snapshotOperation.releaseSnapshotResources(canceled);
}
}
+
+ @Override
+ public void done(boolean canceled) {
+ releaseSnapshotOperationResources(canceled);
+ }
};
LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " +
@@ -348,14 +354,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private long checkpointTimeStamp;
private Snapshot snapshot;
+ private ReadOptions readOptions;
private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
private DataOutputView outputView;
private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
private KeyGroupsStateHandle snapshotResultStateHandle;
-
-
- public RocksDBSnapshotOperation(
+ RocksDBSnapshotOperation(
RocksDBKeyedStateBackend<?> stateBackend,
CheckpointStreamFactory checkpointStreamFactory) {
@@ -397,7 +402,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @throws IOException
*/
public void writeDBSnapshot() throws IOException, InterruptedException {
- Preconditions.checkNotNull(snapshot, "No ongoing snapshot to write.");
+
+ if (null == snapshot) {
+ throw new IOException("No snapshot available. Might be released due to cancellation.");
+ }
+
Preconditions.checkNotNull(outStream, "No output stream to write snapshot.");
writeKVStateMetaData();
writeKVStateData();
@@ -412,6 +421,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (outStream != null) {
stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
snapshotResultStateHandle = closeSnapshotStreamAndGetHandle();
+ } else {
+ snapshotResultStateHandle = null;
}
}
@@ -419,13 +430,36 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* 5) Release the snapshot object for RocksDB and clean up.
*
*/
- public void releaseDBSnapshot() {
- Preconditions.checkNotNull(snapshot, "No ongoing snapshot to release.");
- stateBackend.db.releaseSnapshot(snapshot);
- snapshot = null;
- outStream = null;
- outputView = null;
- kvStateIterators = null;
+ public void releaseSnapshotResources(boolean canceled) {
+ if (null != kvStateIterators) {
+ for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
+ kvStateIterator.f0.close();
+ }
+ kvStateIterators = null;
+ }
+
+ if (null != snapshot) {
+ if(null != stateBackend.db) {
+ stateBackend.db.releaseSnapshot(snapshot);
+ }
+ snapshot.close();
+ snapshot = null;
+ }
+
+ if (null != readOptions) {
+ readOptions.close();
+ readOptions = null;
+ }
+
+ if (canceled) {
+ try {
+ if (null != snapshotResultStateHandle) {
+ snapshotResultStateHandle.discardState();
+ }
+ } catch (Exception ignored) {
+ LOG.warn("Exception occurred during snapshot state handle cleanup: " + ignored);
+ }
+ }
}
/**
@@ -462,7 +496,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
InstantiationUtil.serializeObject(outStream, column.getValue().f1);
//retrieve iterator for this k/v states
- ReadOptions readOptions = new ReadOptions();
+ readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);
RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
@@ -472,59 +506,64 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private void writeKVStateData() throws IOException, InterruptedException {
- RocksDBMergeIterator iterator = new RocksDBMergeIterator(kvStateIterators, stateBackend.keyGroupPrefixBytes);
-
byte[] previousKey = null;
byte[] previousValue = null;
- //preamble: setup with first key-group as our lookahead
- if (iterator.isValid()) {
- //begin first key-group by recording the offset
- keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
- //write the k/v-state id as metadata
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(iterator.kvStateId());
- previousKey = iterator.key();
- previousValue = iterator.value();
- iterator.next();
- }
+ List<Tuple2<RocksIterator, Integer>> kvStateIteratorsHandover = this.kvStateIterators;
+ this.kvStateIterators = null;
- //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
- while (iterator.isValid()) {
+ // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
+ try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
+ kvStateIteratorsHandover, stateBackend.keyGroupPrefixBytes)) {
- assert (!hasMetaDataFollowsFlag(previousKey));
+ //preamble: setup with first key-group as our lookahead
+ if (mergeIterator.isValid()) {
+ //begin first key-group by recording the offset
+ keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
+ //write the k/v-state id as metadata
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(mergeIterator.kvStateId());
+ previousKey = mergeIterator.key();
+ previousValue = mergeIterator.value();
+ mergeIterator.next();
+ }
- //set signal in first key byte that meta data will follow in the stream after this k/v pair
- if (iterator.isNewKeyGroup() || iterator.isNewKeyValueState()) {
+ //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
+ while (mergeIterator.isValid()) {
- //be cooperative and check for interruption from time to time in the hot loop
- checkInterrupted();
+ assert (!hasMetaDataFollowsFlag(previousKey));
- setMetaDataFollowsFlagInKey(previousKey);
- }
+ //set signal in first key byte that meta data will follow in the stream after this k/v pair
+ if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
- writeKeyValuePair(previousKey, previousValue);
+ //be cooperative and check for interruption from time to time in the hot loop
+ checkInterrupted();
- //write meta data if we have to
- if (iterator.isNewKeyGroup()) {
- //
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(END_OF_KEY_GROUP_MARK);
- //begin new key-group
- keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
- //write the kev-state
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(iterator.kvStateId());
- } else if (iterator.isNewKeyValueState()) {
- //write the k/v-state
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(iterator.kvStateId());
- }
+ setMetaDataFollowsFlagInKey(previousKey);
+ }
- //request next k/v pair
- previousKey = iterator.key();
- previousValue = iterator.value();
- iterator.next();
+ writeKeyValuePair(previousKey, previousValue);
+
+ //write meta data if we have to
+ if (mergeIterator.isNewKeyGroup()) {
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(END_OF_KEY_GROUP_MARK);
+ //begin new key-group
+ keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
+ //write the kev-state
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(mergeIterator.kvStateId());
+ } else if (mergeIterator.isNewKeyValueState()) {
+ //write the k/v-state
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(mergeIterator.kvStateId());
+ }
+
+ //request next k/v pair
+ previousKey = mergeIterator.key();
+ previousValue = mergeIterator.value();
+ mergeIterator.next();
+ }
}
//epilogue: write last key-group
@@ -540,11 +579,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException {
StreamStateHandle stateHandle = outStream.closeAndGetHandle();
outStream = null;
- if (stateHandle != null) {
- return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
- } else {
- throw new IOException("Output stream returned null on close.");
- }
+ return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
}
private void writeKeyValuePair(byte[] key, byte[] value) throws IOException {
@@ -566,7 +601,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static void checkInterrupted() throws InterruptedException {
if(Thread.currentThread().isInterrupted()) {
- throw new InterruptedException("Snapshot canceled.");
+ throw new InterruptedException("RocksDB snapshot interrupted.");
}
}
}
@@ -655,7 +690,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
//restore the empty columns for the k/v states through the metadata
for (int i = 0; i < numColumns; i++) {
- StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) InstantiationUtil.deserializeObject(
+ StateDescriptor<?, ?> stateDescriptor = InstantiationUtil.deserializeObject(
currentStateHandleInStream,
rocksDBKeyedStateBackend.userCodeClassLoader);
@@ -829,13 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public int getKvStateId() {
return kvStateId;
}
+
+ public void close() {
+ this.iterator.close();
+ }
}
/**
* Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
* The resulting iteration sequence is ordered by (key-group, kv-state).
*/
- static final class RocksDBMergeIterator {
+ static final class RocksDBMergeIterator implements Closeable {
private final PriorityQueue<MergeIterator> heap;
private final int keyGroupPrefixByteCount;
@@ -845,18 +884,29 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private MergeIterator currentSubIterator;
- RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws IOException {
+ private static final List<Comparator<MergeIterator>> COMPARATORS;
+
+ static {
+ int maxBytes = 4;
+ COMPARATORS = new ArrayList<>(maxBytes);
+ for (int i = 0; i < maxBytes; ++i) {
+ final int currentBytes = i;
+ COMPARATORS.add(new Comparator<MergeIterator>() {
+ @Override
+ public int compare(MergeIterator o1, MergeIterator o2) {
+ int arrayCmpRes = compareKeyGroupsForByteArrays(
+ o1.currentKey, o2.currentKey, currentBytes);
+ return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+ }
+ });
+ }
+ }
+
+ RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
Preconditions.checkNotNull(kvStateIterators);
this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
- Comparator<MergeIterator> iteratorComparator = new Comparator<MergeIterator>() {
- @Override
- public int compare(MergeIterator o1, MergeIterator o2) {
- int arrayCmpRes = compareKeyGroupsForByteArrays(
- o1.currentKey, o2.currentKey, keyGroupPrefixByteCount);
- return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
- }
- };
+ Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
if (kvStateIterators.size() > 0) {
this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
@@ -866,8 +916,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
rocksIterator.seekToFirst();
if (rocksIterator.isValid()) {
heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+ } else {
+ rocksIterator.close();
}
}
+
+ kvStateIterators.clear();
+
this.valid = !heap.isEmpty();
this.currentSubIterator = heap.poll();
} else {
@@ -901,14 +956,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
newKVState = currentSubIterator.getIterator() != rocksIterator;
detectNewKeyGroup(oldKey);
}
- } else if (heap.isEmpty()) {
- valid = false;
} else {
- currentSubIterator = heap.poll();
- newKVState = true;
- detectNewKeyGroup(oldKey);
- }
+ rocksIterator.close();
+ if (heap.isEmpty()) {
+ currentSubIterator = null;
+ valid = false;
+ } else {
+ currentSubIterator = heap.poll();
+ newKVState = true;
+ detectNewKeyGroup(oldKey);
+ }
+ }
}
private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
@@ -986,6 +1045,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
return 0;
}
+
+ @Override
+ public void close() {
+
+ if (null != currentSubIterator) {
+ currentSubIterator.close();
+ currentSubIterator = null;
+ }
+
+ for (MergeIterator iterator : heap) {
+ iterator.close();
+ }
+
+ heap.clear();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9d25434..314717b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -18,26 +18,72 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.OperatingSystem;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksObject;
+import org.rocksdb.Snapshot;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
/**
* Tests for the partitioned state part of {@link RocksDBStateBackend}.
*/
public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> {
+ private OneShotLatch blocker;
+ private OneShotLatch waiter;
+ private BlockerCheckpointStreamFactory testStreamFactory;
+ private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
+ private List<RocksObject> allCreatedCloseables;
+ private ValueState<Integer> testState1;
+ private ValueState<String> testState2;
+
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Before
- public void checkOperatingSystem() {
+ public void checkOS() throws Exception {
Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
}
@@ -49,4 +95,320 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
backend.setDbStoragePath(dbPath);
return backend;
}
+
+ public void setupRocksKeyedStateBackend() throws Exception {
+
+ blocker = new OneShotLatch();
+ waiter = new OneShotLatch();
+ testStreamFactory = new BlockerCheckpointStreamFactory(1024 * 1024);
+ testStreamFactory.setBlockerLatch(blocker);
+ testStreamFactory.setWaiterLatch(waiter);
+ testStreamFactory.setAfterNumberInvocations(100);
+
+ RocksDBStateBackend backend = getStateBackend();
+ Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+ keyedStateBackend = (RocksDBKeyedStateBackend<Integer>) backend.createKeyedStateBackend(
+ env,
+ new JobID(),
+ "Test",
+ IntSerializer.INSTANCE,
+ 2,
+ new KeyGroupRange(0, 1),
+ mock(TaskKvStateRegistry.class));
+
+ testState1 = keyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new ValueStateDescriptor<>("TestState-1", Integer.class, 0));
+
+ testState2 = keyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new ValueStateDescriptor<>("TestState-2", String.class, ""));
+
+ allCreatedCloseables = new ArrayList<>();
+
+ keyedStateBackend.db = spy(keyedStateBackend.db);
+
+ doAnswer(new Answer<Object>() {
+
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ RocksIterator rocksIterator = spy((RocksIterator) invocationOnMock.callRealMethod());
+ allCreatedCloseables.add(rocksIterator);
+ return rocksIterator;
+ }
+ }).when(keyedStateBackend.db).newIterator(any(ColumnFamilyHandle.class), any(ReadOptions.class));
+
+ doAnswer(new Answer<Object>() {
+
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Snapshot snapshot = spy((Snapshot) invocationOnMock.callRealMethod());
+ allCreatedCloseables.add(snapshot);
+ return snapshot;
+ }
+ }).when(keyedStateBackend.db).getSnapshot();
+
+ doAnswer(new Answer<Object>() {
+
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ColumnFamilyHandle snapshot = spy((ColumnFamilyHandle) invocationOnMock.callRealMethod());
+ allCreatedCloseables.add(snapshot);
+ return snapshot;
+ }
+ }).when(keyedStateBackend.db).createColumnFamily(any(ColumnFamilyDescriptor.class));
+
+ for (int i = 0; i < 100; ++i) {
+ keyedStateBackend.setCurrentKey(i);
+ testState1.update(4200 + i);
+ testState2.update("S-" + (4200 + i));
+ }
+ }
+
+ @Test
+ public void testRunningSnapshotAfterBackendClosed() throws Exception {
+ setupRocksKeyedStateBackend();
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+
+ RocksDB spyDB = keyedStateBackend.db;
+
+ verify(spyDB, times(1)).getSnapshot();
+ verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
+
+ this.keyedStateBackend.dispose();
+ verify(spyDB, times(1)).close();
+ assertEquals(null, keyedStateBackend.db);
+
+ //Ensure every RocksObjects not closed yet
+ for (RocksObject rocksCloseable : allCreatedCloseables) {
+ verify(rocksCloseable, times(0)).close();
+ }
+
+ Thread asyncSnapshotThread = new Thread(snapshot);
+ asyncSnapshotThread.start();
+ try {
+ snapshot.get();
+ fail();
+ } catch (Exception ignored) {
+
+ }
+
+ asyncSnapshotThread.join();
+
+ //Ensure every RocksObject was closed exactly once
+ for (RocksObject rocksCloseable : allCreatedCloseables) {
+ verify(rocksCloseable, times(1)).close();
+ }
+
+ }
+
+ @Test
+ public void testReleasingSnapshotAfterBackendClosed() throws Exception {
+ setupRocksKeyedStateBackend();
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+
+ RocksDB spyDB = keyedStateBackend.db;
+
+ verify(spyDB, times(1)).getSnapshot();
+ verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
+
+ this.keyedStateBackend.dispose();
+ verify(spyDB, times(1)).close();
+ assertEquals(null, keyedStateBackend.db);
+
+ //Ensure every RocksObjects not closed yet
+ for (RocksObject rocksCloseable : allCreatedCloseables) {
+ verify(rocksCloseable, times(0)).close();
+ }
+
+ snapshot.cancel(true);
+
+ //Ensure every RocksObjects was closed exactly once
+ for (RocksObject rocksCloseable : allCreatedCloseables) {
+ verify(rocksCloseable, times(1)).close();
+ }
+
+ }
+
+ @Test
+ public void testDismissingSnapshot() throws Exception {
+ setupRocksKeyedStateBackend();
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ snapshot.cancel(true);
+ verifyRocksObjectsReleased();
+ }
+
+ @Test
+ public void testDismissingSnapshotNotRunnable() throws Exception {
+ setupRocksKeyedStateBackend();
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ snapshot.cancel(true);
+ Thread asyncSnapshotThread = new Thread(snapshot);
+ asyncSnapshotThread.start();
+ try {
+ snapshot.get();
+ fail();
+ } catch (Exception ignored) {
+
+ }
+ asyncSnapshotThread.join();
+ verifyRocksObjectsReleased();
+ }
+
+ @Test
+ public void testCompletingSnapshot() throws Exception {
+ setupRocksKeyedStateBackend();
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ Thread asyncSnapshotThread = new Thread(snapshot);
+ asyncSnapshotThread.start();
+ waiter.await(); // wait for snapshot to run
+ waiter.reset();
+ runStateUpdates();
+ blocker.trigger(); // allow checkpointing to start writing
+ waiter.await(); // wait for snapshot stream writing to run
+ KeyGroupsStateHandle keyGroupsStateHandle = snapshot.get();
+ assertNotNull(keyGroupsStateHandle);
+ assertTrue(keyGroupsStateHandle.getStateSize() > 0);
+ assertEquals(2, keyGroupsStateHandle.getNumberOfKeyGroups());
+ assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+ asyncSnapshotThread.join();
+ verifyRocksObjectsReleased();
+ }
+
+ @Test
+ public void testCancelRunningSnapshot() throws Exception {
+ setupRocksKeyedStateBackend();
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ Thread asyncSnapshotThread = new Thread(snapshot);
+ asyncSnapshotThread.start();
+ waiter.await(); // wait for snapshot to run
+ waiter.reset();
+ runStateUpdates();
+ blocker.trigger(); // allow checkpointing to start writing
+ snapshot.cancel(true);
+ assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+ waiter.await(); // wait for snapshot stream writing to run
+ try {
+ snapshot.get();
+ fail();
+ } catch (Exception ignored) {
+ }
+
+ verifyRocksObjectsReleased();
+ asyncSnapshotThread.join();
+ }
+
+ private void runStateUpdates() throws Exception{
+ for (int i = 50; i < 150; ++i) {
+ if (i % 10 == 0) {
+ Thread.sleep(1);
+ }
+ keyedStateBackend.setCurrentKey(i);
+ testState1.update(4200 + i);
+ testState2.update("S-" + (4200 + i));
+ }
+ }
+
+ private void verifyRocksObjectsReleased() {
+ //Ensure every RocksObject was closed exactly once
+ for (RocksObject rocksCloseable : allCreatedCloseables) {
+ verify(rocksCloseable, times(1)).close();
+ }
+
+ assertNotNull(null, keyedStateBackend.db);
+ RocksDB spyDB = keyedStateBackend.db;
+
+ verify(spyDB, times(1)).getSnapshot();
+ verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class));
+
+ keyedStateBackend.dispose();
+ verify(spyDB, times(1)).close();
+ assertEquals(null, keyedStateBackend.db);
+ }
+
+ static class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
+
+ private final int maxSize;
+ private int afterNumberInvocations;
+ private OneShotLatch blocker;
+ private OneShotLatch waiter;
+
+ MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
+
+ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
+ return lastCreatedStream;
+ }
+
+ public BlockerCheckpointStreamFactory(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ public void setAfterNumberInvocations(int afterNumberInvocations) {
+ this.afterNumberInvocations = afterNumberInvocations;
+ }
+
+ public void setBlockerLatch(OneShotLatch latch) {
+ this.blocker = latch;
+ }
+
+ public void setWaiterLatch(OneShotLatch latch) {
+ this.waiter = latch;
+ }
+
+ @Override
+ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+ waiter.trigger();
+ this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
+
+ private int afterNInvocations = afterNumberInvocations;
+ private final OneShotLatch streamBlocker = blocker;
+ private final OneShotLatch streamWaiter = waiter;
+
+ @Override
+ public void write(int b) throws IOException {
+
+ if (afterNInvocations > 0) {
+ --afterNInvocations;
+ }
+
+ if (0 == afterNInvocations && null != streamBlocker) {
+ try {
+ streamBlocker.await();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ try {
+ super.write(b);
+ } catch (IOException ex) {
+ if (null != streamWaiter) {
+ streamWaiter.trigger();
+ }
+ throw ex;
+ }
+
+ if (0 == afterNInvocations && null != streamWaiter) {
+ streamWaiter.trigger();
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ if (null != streamWaiter) {
+ streamWaiter.trigger();
+ }
+ }
+ };
+
+ return lastCreatedStream;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
index 989e868..1968d40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
@@ -131,7 +131,7 @@ public abstract class AbstractAsyncIOCallable<V, D extends Closeable> implements
* it finished or was stopped.
*/
@Override
- public void done() {
+ public void done(boolean canceled) {
//optional callback hook
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
index 13d9057..dcc5525 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
@@ -25,7 +25,9 @@ public interface AsyncDoneCallback {
/**
* the callback
+ *
+ * @param canceled true if the callback is done, but was canceled
*/
- void done();
+ void done(boolean canceled);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
index 8316e4f..1ca109c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
@@ -36,17 +36,13 @@ public class AsyncStoppableTaskWithCallback<V> extends FutureTask<V> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
-
- if (mayInterruptIfRunning) {
- stoppableCallbackCallable.stop();
- }
-
+ stoppableCallbackCallable.stop();
return super.cancel(mayInterruptIfRunning);
}
@Override
protected void done() {
- stoppableCallbackCallable.done();
+ stoppableCallbackCallable.done(isCancelled());
}
public static <V> AsyncStoppableTaskWithCallback<V> from(StoppableCallbackCallable<V> callable) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 30de638..9b2b46f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -144,7 +144,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
return bytes;
}
else {
- throw new IllegalStateException("stream has already been closed");
+ throw new IOException("stream has already been closed");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 52c89f8..4265edc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -78,4 +78,11 @@ public class OperatorSnapshotResult {
public void setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> operatorStateRawFuture) {
this.operatorStateRawFuture = operatorStateRawFuture;
}
-}
\ No newline at end of file
+
+ public void cancel() {
+ getKeyedStateManagedFuture().cancel(true);
+ getOperatorStateManagedFuture().cancel(true);
+ getKeyedStateRawFuture().cancel(true);
+ getOperatorStateRawFuture().cancel(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6595901..fac37c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -942,9 +942,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@Override
public void close() {
- //TODO Handle other state futures in case we actually run them. Currently they are just DoneFutures.
- if (futureKeyedBackendStateHandles != null) {
- futureKeyedBackendStateHandles.cancel(true);
+ // cleanup/release ongoing snapshot operations
+ for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
+ snapshotResult.cancel();
}
}
}
@@ -985,35 +985,55 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
startSyncPartNano = System.nanoTime();
- for (StreamOperator<?> op : allOperators) {
+ boolean failed = true;
+ try {
- createStreamFactory(op);
- snapshotNonPartitionableState(op);
+ for (StreamOperator<?> op : allOperators) {
- OperatorSnapshotResult snapshotInProgress =
- op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory);
+ createStreamFactory(op);
+ snapshotNonPartitionableState(op);
- snapshotInProgressList.add(snapshotInProgress);
- }
+ OperatorSnapshotResult snapshotInProgress =
+ op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
- checkpointMetaData.getCheckpointId(), owner.getName());
- }
+ snapshotInProgressList.add(snapshotInProgress);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
+ checkpointMetaData.getCheckpointId(), owner.getName());
+ }
+
+ startAsyncPartNano = System.nanoTime();
- startAsyncPartNano= System.nanoTime();
+ checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
- checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
+ // at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
+ runAsyncCheckpointingAndAcknowledge();
+ failed = false;
- runAsyncCheckpointingAndAcknowledge();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} - finished synchronous part of checkpoint {}." +
+ "Alignment duration: {} ms, snapshot duration {} ms",
+ owner.getName(), checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
+ checkpointMetaData.getSyncDurationMillis());
+ }
+ } finally {
+ if (failed) {
+ // Cleanup to release resources
+ for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
+ operatorSnapshotResult.cancel();
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} - finished synchronous part of checkpoint {}." +
- "Alignment duration: {} ms, snapshot duration {} ms",
- owner.getName(), checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
- checkpointMetaData.getSyncDurationMillis());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
+ "Alignment duration: {} ms, snapshot duration {} ms",
+ owner.getName(), checkpointMetaData.getCheckpointId());
+ }
+ }
}
+
}
private void createStreamFactory(StreamOperator<?> operator) throws IOException {
@@ -1051,6 +1071,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
public void runAsyncCheckpointingAndAcknowledge() throws IOException {
+
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
nonPartitionedStates,
http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index 0418bf5..b3d86e5 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -107,4 +107,13 @@ public final class OneShotLatch {
public boolean isTriggered() {
return triggered;
}
+
+ /**
+ * resets the latch to triggered = false
+ */
+ public void reset() {
+ synchronized (lock) {
+ triggered = false;
+ }
+ }
}