You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:23 UTC
[05/27] flink git commit: [FLINK-3761] Refactor RocksDB Backend/Make
Key-Group Aware
[FLINK-3761] Refactor RocksDB Backend/Make Key-Group Aware
This change makes the RocksDB backend key-group aware by building on the
changes in the previous commit.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/addd0842
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/addd0842
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/addd0842
Branch: refs/heads/master
Commit: addd0842f9d74b05fc44374c6682b6e697603939
Parents: 4809f53
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Aug 17 14:50:18 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Aug 31 19:10:01 2016 +0200
----------------------------------------------------------------------
.../streaming/state/AbstractRocksDBState.java | 98 ++-
.../streaming/state/RocksDBFoldingState.java | 25 +-
.../state/RocksDBKeyedStateBackend.java | 786 ++++++++++++++++++-
.../streaming/state/RocksDBListState.java | 22 +-
.../streaming/state/RocksDBReducingState.java | 24 +-
.../streaming/state/RocksDBStateBackend.java | 24 +-
.../streaming/state/RocksDBValueState.java | 27 +-
.../state/RocksDBAsyncKVSnapshotTest.java | 330 --------
.../state/RocksDBAsyncSnapshotTest.java | 397 ++++++++++
.../state/RocksDBMergeIteratorTest.java | 140 ++++
.../state/RocksDBStateBackendConfigTest.java | 690 ++++++++--------
.../io/async/AbstractAsyncIOCallable.java | 157 ++++
.../runtime/io/async/AsyncDoneCallback.java | 31 +
.../flink/runtime/io/async/AsyncStoppable.java | 47 ++
.../async/AsyncStoppableTaskWithCallback.java | 55 ++
.../io/async/StoppableCallbackCallable.java | 30 +
.../memory/MemCheckpointStreamFactory.java | 6 +-
.../streaming/runtime/tasks/StreamTask.java | 18 +
18 files changed, 2136 insertions(+), 771 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 710f506..cbc2757 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,8 +20,11 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
@@ -30,7 +33,6 @@ import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
@@ -56,7 +58,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
private N currentNamespace;
/** Backend that holds the actual RocksDB instance where we store state */
- protected RocksDBKeyedStateBackend backend;
+ protected RocksDBKeyedStateBackend<K> backend;
/** The column family of this particular instance of state */
protected ColumnFamilyHandle columnFamily;
@@ -69,14 +71,20 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
*/
private final WriteOptions writeOptions;
+ protected final ByteArrayOutputStreamWithPos keySerializationStream;
+ protected final DataOutputView keySerializationDateDataOutputView;
+
+ private final boolean ambiguousKeyPossible;
+
/**
* Creates a new RocksDB backed state.
* @param namespaceSerializer The serializer for the namespace.
*/
- protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,
+ protected AbstractRocksDBState(
+ ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
SD stateDesc,
- RocksDBKeyedStateBackend backend) {
+ RocksDBKeyedStateBackend<K> backend) {
this.namespaceSerializer = namespaceSerializer;
this.backend = backend;
@@ -85,31 +93,27 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
writeOptions = new WriteOptions();
writeOptions.setDisableWAL(true);
-
this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor");
+
+ this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
+ this.keySerializationDateDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
+ this.ambiguousKeyPossible = (backend.getKeySerializer().getLength() < 0)
+ && (namespaceSerializer.getLength() < 0);
}
// ------------------------------------------------------------------------
@Override
public void clear() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
backend.db.remove(columnFamily, writeOptions, key);
} catch (IOException|RocksDBException e) {
throw new RuntimeException("Error while removing entry from RocksDB", e);
}
}
- protected void writeKeyAndNamespace(DataOutputView out) throws IOException {
- backend.getKeySerializer().serialize(backend.getCurrentKey(), out);
- out.writeByte(42);
- namespaceSerializer.serialize(currentNamespace, out);
- }
-
@Override
public void setCurrentNamespace(N namespace) {
this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace");
@@ -118,17 +122,67 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
@Override
@SuppressWarnings("unchecked")
public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
- // Serialized key and namespace is expected to be of the same format
- // as writeKeyAndNamespace()
Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
- byte[] value = backend.db.get(columnFamily, serializedKeyAndNamespace);
+ //TODO make KvStateRequestSerializer key-group aware to save this round trip and key-group computation
+ Tuple2<K, N> des = KvStateRequestSerializer.<K, N>deserializeKeyAndNamespace(
+ serializedKeyAndNamespace,
+ backend.getKeySerializer(),
+ namespaceSerializer);
+
+ int keyGroup = backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0);
+ writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
+ return backend.db.get(columnFamily, keySerializationStream.toByteArray());
+
+ }
+
+ protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
+ writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace);
+ }
+
+ protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException {
+ keySerializationStream.reset();
+ writeKeyGroup(keyGroup);
+ writeKey(key);
+ writeNameSpace(namespace);
+ }
+
+ private void writeKeyGroup(int keyGroup) throws IOException {
+ for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
+ keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3));
+ }
+ }
+
+ private void writeKey(K key) throws IOException {
+ //write key
+ int beforeWrite = (int) keySerializationStream.getPosition();
+ backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);
+
+ if (ambiguousKeyPossible) {
+ //write size of key
+ writeLengthFrom(beforeWrite);
+ }
+ }
+
+ private void writeNameSpace(N namespace) throws IOException {
+ int beforeWrite = (int) keySerializationStream.getPosition();
+ namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);
- if (value != null) {
- return value;
- } else {
- return null;
+ if (ambiguousKeyPossible) {
+ //write length of namespace
+ writeLengthFrom(beforeWrite);
}
}
+ private void writeLengthFrom(int fromPosition) throws IOException {
+ int length = (int) (keySerializationStream.getPosition() - fromPosition);
+ writeVariableIntBytes(length);
+ }
+
+ private void writeVariableIntBytes(int value) throws IOException {
+ do {
+ keySerializationDateDataOutputView.writeByte(value);
+ value >>>= 8;
+ } while (value != 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 8c0799b..3018f7b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -29,7 +29,6 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
@@ -66,7 +65,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc,
- RocksDBKeyedStateBackend backend) {
+ RocksDBKeyedStateBackend<K> backend) {
super(columnFamily, namespaceSerializer, stateDesc, backend);
@@ -79,11 +78,9 @@ public class RocksDBFoldingState<K, N, T, ACC>
@Override
public ACC get() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return null;
@@ -96,23 +93,21 @@ public class RocksDBFoldingState<K, N, T, ACC>
@Override
public void add(T value) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
-
+ DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
if (valueBytes == null) {
- baos.reset();
+ keySerializationStream.reset();
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
- backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} else {
ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
ACC newValue = foldFunction.fold(oldValue, value);
- baos.reset();
+ keySerializationStream.reset();
valueSerializer.serialize(newValue, out);
- backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
}
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 63f1fa2..a1634b2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
@@ -29,30 +30,49 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.PriorityQueue;
+import java.util.concurrent.RunnableFuture;
/**
* A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
@@ -80,25 +100,29 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
private final File instanceRocksDBPath;
/**
+ * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
+ * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
+ * iterating over a disposed db.
+ */
+ private final SerializableObject dbDisposeLock = new SerializableObject();
+
+ /**
* Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
* to store state. The different k/v states that we have don't each have their own RocksDB
* instance. They all write to this instance but to their own column family.
*/
+ @GuardedBy("dbDisposeLock")
protected volatile RocksDB db;
/**
- * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
- * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
- * iterating over a disposed db.
- */
- private final SerializableObject dbCleanupLock = new SerializableObject();
-
- /**
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;
+ /** Number of bytes required to prefix the key groups. */
+ private final int keyGroupPrefixBytes;
+
public RocksDBKeyedStateBackend(
JobID jobId,
String operatorIdentifier,
@@ -108,7 +132,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
- KeyGroupRange keyGroupRange
+ KeyGroupRange keyGroupRange
) throws Exception {
super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
@@ -147,35 +171,543 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
} catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e);
}
-
+ keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
kvStateInformation = new HashMap<>();
}
+ public RocksDBKeyedStateBackend(
+ JobID jobId,
+ String operatorIdentifier,
+ File instanceBasePath,
+ DBOptions dbOptions,
+ ColumnFamilyOptions columnFamilyOptions,
+ TaskKvStateRegistry kvStateRegistry,
+ TypeSerializer<K> keySerializer,
+ KeyGroupAssigner<K> keyGroupAssigner,
+ KeyGroupRange keyGroupRange,
+ List<KeyGroupsStateHandle> restoreState
+ ) throws Exception {
+ this(
+ jobId,
+ operatorIdentifier,
+ instanceBasePath,
+ dbOptions,
+ columnFamilyOptions,
+ kvStateRegistry,
+ keySerializer,
+ keyGroupAssigner,
+ keyGroupRange);
+
+ LOG.info("Initializing RocksDB keyed state backend from snapshot.");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
+ }
+
+ RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
+ restoreOperation.doRestore(restoreState);
+ }
+
+ /**
+ * @see java.io.Closeable
+ *
+ * Should only be called by one thread.
+ *
+ * @throws Exception
+ */
@Override
public void close() throws Exception {
super.close();
- // we have to lock because we might have an asynchronous checkpoint going on
- synchronized (dbCleanupLock) {
- if (db != null) {
- for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
- column.f0.dispose();
- }
+ final RocksDB cleanupRockDBReference;
- db.dispose();
- db = null;
+ // Acquire the log on dbDisposeLock, so that no ongoing snapshots access the db during cleanup
+ synchronized (dbDisposeLock) {
+ // IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as
+ // working on the disposed object results in SEGFAULTS. Other code has to check field #db for null
+ // and access it in a synchronized block that locks on #dbDisposeLock.
+ cleanupRockDBReference = db;
+ db = null;
+ }
+
+ // Dispose decoupled db
+ if (cleanupRockDBReference != null) {
+ for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+ column.f0.dispose();
}
+ cleanupRockDBReference.dispose();
}
FileUtils.deleteDirectory(instanceBasePath);
}
+ public int getKeyGroupPrefixBytes() {
+ return keyGroupPrefixBytes;
+ }
+
+ /**
+ * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
+ * is also stopped when the backend is closed through {@link #close()}. For each backend, this method must always
+ * be called by the same thread.
+ *
+ * @param checkpointId The Id of the checkpoint.
+ * @param timestamp The timestamp of the checkpoint.
+ * @param streamFactory The factory that we can use for writing our state to streams.
+ *
+ * @return Future to the state handle of the snapshot data.
+ * @throws Exception
+ */
@Override
- public Future<KeyGroupsStateHandle> snapshot(
- long checkpointId,
- long timestamp,
- CheckpointStreamFactory streamFactory) throws Exception {
- throw new RuntimeException("Not implemented.");
+ public RunnableFuture<KeyGroupsStateHandle> snapshot(
+ final long checkpointId,
+ final long timestamp,
+ final CheckpointStreamFactory streamFactory) throws Exception {
+
+ long startTime = System.currentTimeMillis();
+
+ if (kvStateInformation.isEmpty()) {
+ LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null.");
+ return new DoneFuture<>(null);
+ }
+
+ final RocksDBSnapshotOperation snapshotOperation = new RocksDBSnapshotOperation(this, streamFactory);
+ // hold the db lock while operation on the db to guard us against async db disposal
+ synchronized (dbDisposeLock) {
+ if (db != null) {
+ snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
+ } else {
+ throw new IOException("RocksDB closed.");
+ }
+ }
+
+ // implementation of the async IO operation, based on FutureTask
+ AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+ new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+ @Override
+ public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+ snapshotOperation.openCheckpointStream();
+ return snapshotOperation.getOutStream();
+ }
+
+ @Override
+ public KeyGroupsStateHandle performOperation() throws Exception {
+ long startTime = System.currentTimeMillis();
+ try {
+ // hold the db lock while operation on the db to guard us against async db disposal
+ synchronized (dbDisposeLock) {
+ if (db != null) {
+ snapshotOperation.writeDBSnapshot();
+ } else {
+ throw new IOException("RocksDB closed.");
+ }
+ }
+
+ } finally {
+ snapshotOperation.closeCheckpointStream();
+ }
+
+ LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " +
+ Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
+
+ return snapshotOperation.getSnapshotResultStateHandle();
+ }
+
+ @Override
+ public void done() {
+ // hold the db lock while operation on the db to guard us against async db disposal
+ synchronized (dbDisposeLock) {
+ if (db != null) {
+ snapshotOperation.releaseDBSnapshot();
+ }
+ }
+ }
+ };
+
+ LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " +
+ Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
+
+ return AsyncStoppableTaskWithCallback.from(ioCallable);
+ }
+
+ /**
+ * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend.
+ */
+ static final class RocksDBSnapshotOperation {
+
+ static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
+ static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
+
+ private final RocksDBKeyedStateBackend<?> stateBackend;
+ private final KeyGroupRangeOffsets keyGroupRangeOffsets;
+ private final CheckpointStreamFactory checkpointStreamFactory;
+
+ private long checkpointId;
+ private long checkpointTimeStamp;
+
+ private Snapshot snapshot;
+ private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
+ private DataOutputView outputView;
+ private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
+ private KeyGroupsStateHandle snapshotResultStateHandle;
+
+
+
+ public RocksDBSnapshotOperation(
+ RocksDBKeyedStateBackend<?> stateBackend,
+ CheckpointStreamFactory checkpointStreamFactory) {
+
+ this.stateBackend = stateBackend;
+ this.checkpointStreamFactory = checkpointStreamFactory;
+ this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
+ }
+
+ /**
+ * 1) Create a snapshot object from RocksDB.
+ *
+ * @param checkpointId id of the checkpoint for which we take the snapshot
+ * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot
+ */
+ public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) throws IOException {
+ Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
+ this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size());
+ this.checkpointId = checkpointId;
+ this.checkpointTimeStamp = checkpointTimeStamp;
+ this.snapshot = stateBackend.db.getSnapshot();
+ }
+
+ /**
+ * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write.
+ *
+ * @throws Exception
+ */
+ public void openCheckpointStream() throws Exception {
+ Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
+ outStream = checkpointStreamFactory.
+ createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+ outputView = new DataOutputViewStreamWrapper(outStream);
+ }
+
+ /**
+ * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1).
+ *
+ * @return
+ * @throws IOException
+ */
+ public void writeDBSnapshot() throws IOException, InterruptedException {
+ Preconditions.checkNotNull(snapshot, "No ongoing snapshot to write.");
+ Preconditions.checkNotNull(outStream, "No output stream to write snapshot.");
+ writeKVStateMetaData();
+ writeKVStateData();
+ }
+
+ /**
+ * 4) Close the CheckpointStateOutputStream after writing and receive a state handle.
+ *
+ * @throws IOException
+ */
+ public void closeCheckpointStream() throws IOException {
+ if(outStream != null) {
+ snapshotResultStateHandle = closeSnapshotStreamAndGetHandle();
+ }
+ }
+
+ /**
+ * 5) Release the snapshot object for RocksDB and clean up.
+ *
+ */
+ public void releaseDBSnapshot() {
+ Preconditions.checkNotNull(snapshot, "No ongoing snapshot to release.");
+ stateBackend.db.releaseSnapshot(snapshot);
+ snapshot = null;
+ outStream = null;
+ outputView = null;
+ kvStateIterators = null;
+ }
+
+ /**
+ * Returns the current CheckpointStateOutputStream (when it was opened and not yet closed) into which we write
+ * the state snapshot.
+ *
+ * @return the current CheckpointStateOutputStream
+ */
+ public CheckpointStreamFactory.CheckpointStateOutputStream getOutStream() {
+ return outStream;
+ }
+
+ /**
+ * Returns a state handle to the snapshot after the snapshot procedure is completed and null before.
+ *
+ * @return state handle to the completed snapshot
+ */
+ public KeyGroupsStateHandle getSnapshotResultStateHandle() {
+ return snapshotResultStateHandle;
+ }
+
+ private void writeKVStateMetaData() throws IOException, InterruptedException {
+ //write number of k/v states
+ outputView.writeInt(stateBackend.kvStateInformation.size());
+
+ int kvStateId = 0;
+ //iterate all column families, where each column family holds one k/v state, to write the metadata
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : stateBackend.kvStateInformation.entrySet()) {
+
+ //be cooperative and check for interruption from time to time in the hot loop
+ checkInterrupted();
+
+ //write StateDescriptor for this k/v state
+ ObjectOutputStream ooOut = new ObjectOutputStream(outStream);
+ ooOut.writeObject(column.getValue().f1);
+ //retrieve iterator for this k/v states
+ ReadOptions readOptions = new ReadOptions();
+ readOptions.setSnapshot(snapshot);
+ RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
+ kvStateIterators.add(new Tuple2<RocksIterator, Integer>(iterator, kvStateId));
+ ++kvStateId;
+ }
+ }
+
+ private void writeKVStateData() throws IOException, InterruptedException {
+
+ RocksDBMergeIterator iterator = new RocksDBMergeIterator(kvStateIterators, stateBackend.keyGroupPrefixBytes);
+
+ byte[] previousKey = null;
+ byte[] previousValue = null;
+
+ //preamble: setup with first key-group as our lookahead
+ if (iterator.isValid()) {
+ //begin first key-group by recording the offset
+ keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
+ //write the k/v-state id as metadata
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(iterator.kvStateId());
+ previousKey = iterator.key();
+ previousValue = iterator.value();
+ iterator.next();
+ }
+
+ //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
+ while (iterator.isValid()) {
+
+ assert (!hasMetaDataFollowsFlag(previousKey));
+
+ //set signal in first key byte that meta data will follow in the stream after this k/v pair
+ if (iterator.isNewKeyGroup() || iterator.isNewKeyValueState()) {
+
+ //be cooperative and check for interruption from time to time in the hot loop
+ checkInterrupted();
+
+ setMetaDataFollowsFlagInKey(previousKey);
+ }
+
+ writeKeyValuePair(previousKey, previousValue);
+
+ //write meta data if we have to
+ if (iterator.isNewKeyGroup()) {
+ //
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(END_OF_KEY_GROUP_MARK);
+ //begin new key-group
+ keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
+ //write the kev-state
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(iterator.kvStateId());
+ } else if (iterator.isNewKeyValueState()) {
+ //write the k/v-state
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(iterator.kvStateId());
+ }
+
+ //request next k/v pair
+ previousKey = iterator.key();
+ previousValue = iterator.value();
+ iterator.next();
+ }
+
+ //epilogue: write last key-group
+ if (previousKey != null) {
+ assert (!hasMetaDataFollowsFlag(previousKey));
+ setMetaDataFollowsFlagInKey(previousKey);
+ writeKeyValuePair(previousKey, previousValue);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ outputView.writeShort(END_OF_KEY_GROUP_MARK);
+ }
+ }
+
+ private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException {
+ StreamStateHandle stateHandle = outStream.closeAndGetHandle();
+ outStream = null;
+ if (stateHandle != null) {
+ return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ } else {
+ throw new IOException("Output stream returned null on close.");
+ }
+ }
+
+ private void writeKeyValuePair(byte[] key, byte[] value) throws IOException {
+ BytePrimitiveArraySerializer.INSTANCE.serialize(key, outputView);
+ BytePrimitiveArraySerializer.INSTANCE.serialize(value, outputView);
+ }
+
+ static void setMetaDataFollowsFlagInKey(byte[] key) {
+ key[0] |= FIRST_BIT_IN_BYTE_MASK;
+ }
+
+ static void clearMetaDataFollowsFlag(byte[] key) {
+ key[0] &= (~RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+ }
+
+ static boolean hasMetaDataFollowsFlag(byte[] key) {
+ return 0 != (key[0] & RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+ }
+
+ private static void checkInterrupted() throws InterruptedException {
+ if(Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Snapshot canceled.");
+ }
+ }
+ }
+
+ /**
+ * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot.
+ */
+ static final class RocksDBRestoreOperation {
+
+ private final RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend;
+
+ /** Current key-groups state handle from which we restore key-groups */
+ private KeyGroupsStateHandle currentKeyGroupsStateHandle;
+ /** Current input stream we obtained from currentKeyGroupsStateHandle */
+ private FSDataInputStream currentStateHandleInStream;
+ /** Current data input view that wraps currentStateHandleInStream */
+ private DataInputView currentStateHandleInView;
+ /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle */
+ private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
+
+ /**
+ * Creates a restore operation object for the given state backend instance.
+ *
+ * @param rocksDBKeyedStateBackend the state backend into which we restore
+ */
+ public RocksDBRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend) {
+ this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
+ }
+
+ /**
+ * Restores all key-groups data that is referenced by the passed state handles.
+ *
+ * @param keyGroupsStateHandles List of all key groups state handles that shall be restored.
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws RocksDBException
+ */
+ public void doRestore(List<KeyGroupsStateHandle> keyGroupsStateHandles)
+ throws IOException, ClassNotFoundException, RocksDBException {
+
+ for (KeyGroupsStateHandle keyGroupsStateHandle : keyGroupsStateHandles) {
+ if (keyGroupsStateHandle != null) {
+ this.currentKeyGroupsStateHandle = keyGroupsStateHandle;
+ restoreKeyGroupsInStateHandle();
+ }
+ }
+ }
+
+ /**
+ * Restore one key groups state handle
+ *
+ * @throws IOException
+ * @throws RocksDBException
+ * @throws ClassNotFoundException
+ */
+ private void restoreKeyGroupsInStateHandle()
+ throws IOException, RocksDBException, ClassNotFoundException {
+ try {
+ currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+ currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
+ restoreKVStateMetaData();
+ restoreKVStateData();
+ } finally {
+ if(currentStateHandleInStream != null) {
+ currentStateHandleInStream.close();
+ }
+ }
+
+ }
+
+ /**
+ * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws RocksDBException
+ */
+ private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
+ //read number of k/v states
+ int numColumns = currentStateHandleInView.readInt();
+
+ //those two lists are aligned and should later have the same size!
+ currentStateHandleKVStateColumnFamilies = new ArrayList<>(numColumns);
+
+ //restore the empty columns for the k/v states through the metadata
+ for (int i = 0; i < numColumns; i++) {
+ ObjectInputStream ooIn = new ObjectInputStream(currentStateHandleInStream);
+ StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+ Tuple2<ColumnFamilyHandle, StateDescriptor> columnFamily = rocksDBKeyedStateBackend.
+ kvStateInformation.get(stateDescriptor.getName());
+
+ if(null == columnFamily) {
+ ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
+ stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+
+ columnFamily = new Tuple2<>(rocksDBKeyedStateBackend.db.
+ createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+ rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily);
+ }
+
+ currentStateHandleKVStateColumnFamilies.add(columnFamily.f0);
+ }
+ }
+
+ /**
+ * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle
+ *
+ * @throws IOException
+ * @throws RocksDBException
+ */
+ private void restoreKVStateData() throws IOException, RocksDBException {
+ //for all key-groups in the current state handle...
+ for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+ long offset = keyGroupOffset.f1;
+ //not empty key-group?
+ if (0L != offset) {
+ currentStateHandleInStream.seek(offset);
+ boolean keyGroupHasMoreKeys = true;
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ int kvStateId = currentStateHandleInView.readShort();
+ ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ //insert all k/v pairs into DB
+ while (keyGroupHasMoreKeys) {
+ byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
+ byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
+ if (RocksDBSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+ //clear the signal bit in the key to make it ready for insertion again
+ RocksDBSnapshotOperation.clearMetaDataFollowsFlag(key);
+ rocksDBKeyedStateBackend.db.put(handle, key, value);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kvStateId = RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK
+ & currentStateHandleInView.readShort();
+ if (RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+ keyGroupHasMoreKeys = false;
+ } else {
+ handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ }
+ } else {
+ rocksDBKeyedStateBackend.db.put(handle, key, value);
+ }
+ }
+ }
+ }
+ }
}
// ------------------------------------------------------------------------
@@ -197,12 +729,14 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
if (stateInfo != null) {
if (!stateInfo.f1.equals(descriptor)) {
- throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor);
+ throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 +
+ " trying access with " + descriptor);
}
return stateInfo.f0;
}
- ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), columnOptions);
+ ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
+ descriptor.getName().getBytes(), columnOptions);
try {
ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
@@ -248,4 +782,206 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
+
+ /**
+ * Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator.
+ * Used by #MergeIterator.
+ */
+ static final class MergeIterator {
+
+ /**
+ *
+ * @param iterator The #RocksIterator to wrap .
+ * @param kvStateId Id of the K/V state to which this iterator belongs.
+ */
+ public MergeIterator(RocksIterator iterator, int kvStateId) {
+ this.iterator = Preconditions.checkNotNull(iterator);
+ this.currentKey = iterator.key();
+ this.kvStateId = kvStateId;
+ }
+
+ private byte[] currentKey;
+ private final RocksIterator iterator;
+ private final int kvStateId;
+
+ public byte[] getCurrentKey() {
+ return currentKey;
+ }
+
+ public void setCurrentKey(byte[] currentKey) {
+ this.currentKey = currentKey;
+ }
+
+ public RocksIterator getIterator() {
+ return iterator;
+ }
+
+ public int getKvStateId() {
+ return kvStateId;
+ }
+ }
+
+ /**
+ * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+ static final class RocksDBMergeIterator {
+
+ private final PriorityQueue<MergeIterator> heap;
+ private final int keyGroupPrefixByteCount;
+ private boolean newKeyGroup;
+ private boolean newKVState;
+ private boolean valid;
+
+ private MergeIterator currentSubIterator;
+
+ RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws IOException {
+ Preconditions.checkNotNull(kvStateIterators);
+ this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+ Comparator<MergeIterator> iteratorComparator = new Comparator<MergeIterator>() {
+ @Override
+ public int compare(MergeIterator o1, MergeIterator o2) {
+ int arrayCmpRes = compareKeyGroupsForByteArrays(
+ o1.currentKey, o2.currentKey, keyGroupPrefixByteCount);
+ return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+ }
+ };
+
+ if (kvStateIterators.size() > 0) {
+ this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+
+ for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
+ RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
+ rocksIterator.seekToFirst();
+ if (rocksIterator.isValid()) {
+ heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+ }
+ }
+ this.valid = !heap.isEmpty();
+ this.currentSubIterator = heap.poll();
+ } else {
+ // creating a PriorityQueue of size 0 results in an exception.
+ this.heap = null;
+ this.valid = false;
+ }
+
+ this.newKeyGroup = true;
+ this.newKVState = true;
+ }
+
+ /**
+ * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
+ * calls to {@link #next()}.
+ */
+ public void next() {
+ newKeyGroup = false;
+ newKVState = false;
+
+ final RocksIterator rocksIterator = currentSubIterator.getIterator();
+ rocksIterator.next();
+
+ byte[] oldKey = currentSubIterator.getCurrentKey();
+ if (rocksIterator.isValid()) {
+ currentSubIterator.currentKey = rocksIterator.key();
+
+ if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+ heap.offer(currentSubIterator);
+ currentSubIterator = heap.poll();
+ newKVState = currentSubIterator.getIterator() != rocksIterator;
+ detectNewKeyGroup(oldKey);
+ }
+ } else if (heap.isEmpty()) {
+ valid = false;
+ } else {
+ currentSubIterator = heap.poll();
+ newKVState = true;
+ detectNewKeyGroup(oldKey);
+ }
+
+ }
+
+ private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
+ return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
+ }
+
+ private void detectNewKeyGroup(byte[] oldKey) {
+ if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) {
+ newKeyGroup = true;
+ }
+ }
+
+ /**
+ * Returns the key-group for the current key.
+ * @return key-group for the current key
+ */
+ public int keyGroup() {
+ int result = 0;
+ //big endian decode
+ for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
+ result <<= 8;
+ result |= (currentSubIterator.currentKey[i] & 0xFF);
+ }
+ return result;
+ }
+
+ public byte[] key() {
+ return currentSubIterator.getCurrentKey();
+ }
+
+ public byte[] value() {
+ return currentSubIterator.getIterator().value();
+ }
+
+ /**
+ * Returns the Id of the k/v state to which the current key belongs.
+ * @return Id of K/V state to which the current key belongs.
+ */
+ public int kvStateId() {
+ return currentSubIterator.getKvStateId();
+ }
+
+ /**
+ * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
+ * @return true iff the current key belong to a different k/v-state than it's predecessor.
+ */
+ public boolean isNewKeyValueState() {
+ return newKVState;
+ }
+
+ /**
+ * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
+ * @return true iff the current key belong to a different key-group than it's predecessor.
+ */
+ public boolean isNewKeyGroup() {
+ return newKeyGroup;
+ }
+
+ /**
+ * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
+ * {@link #next()} should only be called if valid returned true. Should be checked after each call to
+ * {@link #next()} before accessing iterator state.
+ * @return True iff this iterator is valid.
+ */
+ public boolean isValid() {
+ return valid;
+ }
+
+ private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
+ for (int i = 0; i < len; ++i) {
+ int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * Only visible for testing, DO NOT USE.
+ */
+ public File getInstanceBasePath() {
+ return instanceBasePath;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index d8f937b..beea81a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -28,7 +28,6 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -67,7 +66,7 @@ public class RocksDBListState<K, N, V>
public RocksDBListState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
- RocksDBKeyedStateBackend backend) {
+ RocksDBKeyedStateBackend<K> backend) {
super(columnFamily, namespaceSerializer, stateDesc, backend);
this.valueSerializer = stateDesc.getSerializer();
@@ -78,11 +77,9 @@ public class RocksDBListState<K, N, V>
@Override
public Iterable<V> get() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
@@ -107,16 +104,13 @@ public class RocksDBListState<K, N, V>
@Override
public void add(V value) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
-
- baos.reset();
-
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
+ keySerializationStream.reset();
+ DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
valueSerializer.serialize(value, out);
- backend.db.merge(columnFamily, writeOptions, key, baos.toByteArray());
+ backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 15ae493..068c051 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -29,7 +29,6 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
@@ -65,7 +64,7 @@ public class RocksDBReducingState<K, N, V>
public RocksDBReducingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
- RocksDBKeyedStateBackend backend) {
+ RocksDBKeyedStateBackend<K> backend) {
super(columnFamily, namespaceSerializer, stateDesc, backend);
this.valueSerializer = stateDesc.getSerializer();
@@ -77,11 +76,9 @@ public class RocksDBReducingState<K, N, V>
@Override
public V get() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return null;
@@ -94,23 +91,22 @@ public class RocksDBReducingState<K, N, V>
@Override
public void add(V value) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
+ DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
if (valueBytes == null) {
- baos.reset();
+ keySerializationStream.reset();
valueSerializer.serialize(value, out);
- backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} else {
V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
V newValue = reduceFunction.reduce(oldValue, value);
- baos.reset();
+ keySerializationStream.reset();
valueSerializer.serialize(newValue, out);
- backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
}
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 62b71d9..f950751 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -221,12 +221,6 @@ public class RocksDBStateBackend extends AbstractStateBackend {
@Override
public CheckpointStreamFactory createStreamFactory(JobID jobId,
String operatorIdentifier) throws IOException {
- return null;
- }
-
- if (fullyAsyncBackup) {
- return performFullyAsyncSnapshot(checkpointId, timestamp);
- } else {
return checkpointStreamBackend.createStreamFactory(jobId, operatorIdentifier);
}
@@ -261,10 +255,24 @@ public class RocksDBStateBackend extends AbstractStateBackend {
String operatorIdentifier,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
- KeyGroupRange keyGroupRange,
+ KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception {
- throw new RuntimeException("Not implemented.");
+
+ lazyInitializeForJob(env, operatorIdentifier);
+
+ File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString());
+ return new RocksDBKeyedStateBackend<>(
+ jobID,
+ operatorIdentifier,
+ instanceBasePath,
+ getDbOptions(),
+ getColumnOptions(),
+ kvStateRegistry,
+ keySerializer,
+ keyGroupAssigner,
+ keyGroupRange,
+ restoredState);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index b9c0e83..9563ed8 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -24,13 +24,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
@@ -63,7 +61,7 @@ public class RocksDBValueState<K, N, V>
public RocksDBValueState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc,
- RocksDBKeyedStateBackend backend) {
+ RocksDBKeyedStateBackend<K> backend) {
super(columnFamily, namespaceSerializer, stateDesc, backend);
this.valueSerializer = stateDesc.getSerializer();
@@ -74,11 +72,9 @@ public class RocksDBValueState<K, N, V>
@Override
public V value() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return stateDesc.getDefaultValue();
@@ -95,14 +91,13 @@ public class RocksDBValueState<K, N, V>
clear();
return;
}
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+ DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
try {
- writeKeyAndNamespace(out);
- byte[] key = baos.toByteArray();
- baos.reset();
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
+ keySerializationStream.reset();
valueSerializer.serialize(value, out);
- backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
@@ -110,11 +105,7 @@ public class RocksDBValueState<K, N, V>
@Override
public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
- // Serialized key and namespace is expected to be of the same format
- // as writeKeyAndNamespace()
- Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
-
- byte[] value = backend.db.get(columnFamily, serializedKeyAndNamespace);
+ byte[] value = super.getSerializedValue(serializedKeyAndNamespace);
if (value != null) {
return value;
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
deleted file mode 100644
index 0e35b60..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for asynchronous RocksDB Key/Value state checkpoints.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class, FileSystem.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-@SuppressWarnings("serial")
-public class RocksDBAsyncKVSnapshotTest {
-
- @Before
- public void checkOperatingSystem() {
- Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
- }
-
- /**
- * This ensures that asynchronous state handles are actually materialized asynchonously.
- *
- * <p>We use latches to block at various stages and see if the code still continues through
- * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
- * test will simply lock forever.
- */
- @Test
- public void testAsyncCheckpoints() throws Exception {
- LocalFileSystem localFS = new LocalFileSystem();
- localFS.initialize(new URI("file:///"), new Configuration());
- PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS);
-
- final OneShotLatch delayCheckpointLatch = new OneShotLatch();
- final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-
- final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.configureForKeyedStream(new KeySelector<String, String>() {
- @Override
- public String getKey(String value) throws Exception {
- return value;
- }
- }, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
-
- File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
- File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
-
- RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
- backend.setDbStoragePath(dbDir.getAbsolutePath());
-
- streamConfig.setStateBackend(backend);
-
- streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-
- StreamMockEnvironment mockEnv = new StreamMockEnvironment(
- testHarness.jobConfig,
- testHarness.taskConfig,
- testHarness.memorySize,
- new MockInputSplitProvider(),
- testHarness.bufferSize) {
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId) {
- super.acknowledgeCheckpoint(checkpointId);
- }
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles) {
- super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
-
- // block on the latch, to verify that triggerCheckpoint returns below,
- // even though the async checkpoint would not finish
- try {
- delayCheckpointLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
-
- // should be only one k/v state
-
- assertEquals(1, keyGroupStateHandles.size());
-
- // we now know that the checkpoint went through
- ensureCheckpointLatch.trigger();
- }
- };
-
- testHarness.invoke(mockEnv);
-
- // wait for the task to be running
- for (Field field: StreamTask.class.getDeclaredFields()) {
- if (field.getName().equals("isRunning")) {
- field.setAccessible(true);
- while (!field.getBoolean(task)) {
- Thread.sleep(10);
- }
-
- }
- }
-
- testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
- task.triggerCheckpoint(42, 17);
-
- // now we allow the checkpoint
- delayCheckpointLatch.trigger();
-
- // wait for the checkpoint to go through
- ensureCheckpointLatch.await();
-
- testHarness.endInput();
- testHarness.waitForTaskCompletion();
- }
-
- /**
- * This ensures that asynchronous state handles are actually materialized asynchonously.
- *
- * <p>We use latches to block at various stages and see if the code still continues through
- * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
- * test will simply lock forever.
- */
- @Test
- public void testFullyAsyncCheckpoints() throws Exception {
- LocalFileSystem localFS = new LocalFileSystem();
- localFS.initialize(new URI("file:///"), new Configuration());
- PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS);
-
- final OneShotLatch delayCheckpointLatch = new OneShotLatch();
- final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-
- final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.configureForKeyedStream(new KeySelector<String, String>() {
- @Override
- public String getKey(String value) throws Exception {
- return value;
- }
- }, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
-
- File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
- File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
-
- RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
- backend.setDbStoragePath(dbDir.getAbsolutePath());
-// backend.enableFullyAsyncSnapshots();
-
- streamConfig.setStateBackend(backend);
-
- streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-
- StreamMockEnvironment mockEnv = new StreamMockEnvironment(
- testHarness.jobConfig,
- testHarness.taskConfig,
- testHarness.memorySize,
- new MockInputSplitProvider(),
- testHarness.bufferSize) {
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId) {
- super.acknowledgeCheckpoint(checkpointId);
- }
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles) {
- super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
-
- // block on the latch, to verify that triggerCheckpoint returns below,
- // even though the async checkpoint would not finish
- try {
- delayCheckpointLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- // should be only one k/v state
- assertEquals(1, keyGroupStateHandles.size());
-
- // we now know that the checkpoint went through
- ensureCheckpointLatch.trigger();
- }
- };
-
- testHarness.invoke(mockEnv);
-
- // wait for the task to be running
- for (Field field: StreamTask.class.getDeclaredFields()) {
- if (field.getName().equals("isRunning")) {
- field.setAccessible(true);
- while (!field.getBoolean(task)) {
- Thread.sleep(10);
- }
-
- }
- }
-
- testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
- task.triggerCheckpoint(42, 17);
-
- // now we allow the checkpoint
- delayCheckpointLatch.trigger();
-
- // wait for the checkpoint to go through
- ensureCheckpointLatch.await();
-
- testHarness.endInput();
- testHarness.waitForTaskCompletion();
- }
-
-
- // ------------------------------------------------------------------------
-
- public static class AsyncCheckpointOperator
- extends AbstractStreamOperator<String>
- implements OneInputStreamOperator<String, String> {
-
- @Override
- public void open() throws Exception {
- super.open();
-
- // also get the state in open, this way we are sure that it was created before
- // we trigger the test checkpoint
- ValueState<String> state = getPartitionedState(
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- new ValueStateDescriptor<>("count",
- StringSerializer.INSTANCE, "hello"));
-
- }
-
- @Override
- public void processElement(StreamRecord<String> element) throws Exception {
- // we also don't care
-
- ValueState<String> state = getPartitionedState(
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- new ValueStateDescriptor<>("count",
- StringSerializer.INSTANCE, "hello"));
-
- state.update(element.getValue());
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- // not interested
- }
- }
-
- public static class DummyMapFunction<T> implements MapFunction<T, T> {
- @Override
- public T map(T value) { return value; }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
new file mode 100644
index 0000000..624905c
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for asynchronous RocksDB Key/Value state checkpoints.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class, FileSystem.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+@SuppressWarnings("serial")
+public class RocksDBAsyncSnapshotTest {
+
+ @Before
+ public void checkOperatingSystem() {
+ Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+ }
+
+ /**
+ * This ensures that asynchronous state handles are actually materialized asynchonously.
+ *
+ * <p>We use latches to block at various stages and see if the code still continues through
+ * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
+ * test will simply lock forever.
+ */
+ @Test
+ public void testFullyAsyncSnapshot() throws Exception {
+
+ LocalFileSystem localFS = new LocalFileSystem();
+ localFS.initialize(new URI("file:///"), new Configuration());
+ PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS);
+
+ final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.configureForKeyedStream(new KeySelector<String, String>() {
+ @Override
+ public String getKey(String value) throws Exception {
+ return value;
+ }
+ }, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+
+ File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
+ File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
+
+ RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
+ backend.setDbStoragePath(dbDir.getAbsolutePath());
+
+ streamConfig.setStateBackend(backend);
+
+ streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+
+ final OneShotLatch delayCheckpointLatch = new OneShotLatch();
+ final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
+
+ StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+ testHarness.jobConfig,
+ testHarness.taskConfig,
+ testHarness.memorySize,
+ new MockInputSplitProvider(),
+ testHarness.bufferSize) {
+
+ @Override
+ public void acknowledgeCheckpoint(long checkpointId) {
+ super.acknowledgeCheckpoint(checkpointId);
+ }
+
+ @Override
+ public void acknowledgeCheckpoint(long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupStateHandles) {
+ super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
+
+ // block on the latch, to verify that triggerCheckpoint returns below,
+ // even though the async checkpoint would not finish
+ try {
+ delayCheckpointLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // should be only one k/v state
+ assertEquals(1, keyGroupStateHandles.size());
+
+ // we now know that the checkpoint went through
+ ensureCheckpointLatch.trigger();
+ }
+ };
+
+ testHarness.invoke(mockEnv);
+
+ // wait for the task to be running
+ for (Field field: StreamTask.class.getDeclaredFields()) {
+ if (field.getName().equals("isRunning")) {
+ field.setAccessible(true);
+ while (!field.getBoolean(task)) {
+ Thread.sleep(10);
+ }
+
+ }
+ }
+
+ testHarness.processElement(new StreamRecord<>("Wohoo", 0));
+
+ task.triggerCheckpoint(42, 17);
+
+ // now we allow the checkpoint
+ delayCheckpointLatch.trigger();
+
+ // wait for the checkpoint to go through
+ ensureCheckpointLatch.await();
+
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+ }
+
+ /**
+ * This tests ensures that canceling of asynchronous snapshots works as expected and does not block.
+ * @throws Exception
+ */
+ @Test
+ public void testCancelFullyAsyncCheckpoints() throws Exception {
+ LocalFileSystem localFS = new LocalFileSystem();
+ localFS.initialize(new URI("file:///"), new Configuration());
+ PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS);
+
+ final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+
+ //ensure that the async threads complete before invoke method of the tasks returns.
+ task.setThreadPoolTerminationTimeout(Long.MAX_VALUE);
+
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.configureForKeyedStream(new KeySelector<String, String>() {
+ @Override
+ public String getKey(String value) throws Exception {
+ return value;
+ }
+ }, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+
+ File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
+ File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
+
+ BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
+
+ RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), memoryStateBackend);
+ backend.setDbStoragePath(dbDir.getAbsolutePath());
+
+ streamConfig.setStateBackend(backend);
+
+ streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+
+ StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+ testHarness.jobConfig,
+ testHarness.taskConfig,
+ testHarness.memorySize,
+ new MockInputSplitProvider(),
+ testHarness.bufferSize);
+
+ testHarness.invoke(mockEnv);
+
+ // wait for the task to be running
+ for (Field field: StreamTask.class.getDeclaredFields()) {
+ if (field.getName().equals("isRunning")) {
+ field.setAccessible(true);
+ while (!field.getBoolean(task)) {
+ Thread.sleep(10);
+ }
+
+ }
+ }
+
+ testHarness.processElement(new StreamRecord<>("Wohoo", 0));
+
+ task.triggerCheckpoint(42, 17);
+
+ BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
+ task.cancel();
+
+ BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger();
+
+ testHarness.endInput();
+ try {
+ testHarness.waitForTaskCompletion();
+ Assert.fail("Operation completed. Cancel failed.");
+ } catch (Exception expected) {
+ // we expect the exception from canceling snapshots
+ Throwable cause = expected.getCause();
+ if(cause instanceof AsynchronousException) {
+ AsynchronousException asynchronousException = (AsynchronousException) cause;
+ cause = asynchronousException.getCause();
+ Assert.assertTrue("Unexpected Exception: " + cause,
+ cause instanceof CancellationException //future canceled
+ || cause instanceof InterruptedException); //thread interrupted
+
+ } else {
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testConsistentSnapshotSerializationFlagsAndMasks() {
+
+ Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK);
+ Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+
+ byte[] expectedKey = new byte[] {42, 42};
+ byte[] modKey = expectedKey.clone();
+
+ Assert.assertFalse(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+ RocksDBKeyedStateBackend.RocksDBSnapshotOperation.setMetaDataFollowsFlagInKey(modKey);
+ Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+ RocksDBKeyedStateBackend.RocksDBSnapshotOperation.clearMetaDataFollowsFlag(modKey);
+ Assert.assertFalse(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+ Assert.assertTrue(Arrays.equals(expectedKey, modKey));
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates us a CheckpointStateOutputStream that blocks write ops on a latch to delay writing of snapshots.
+ */
+ static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {
+
+ public static OneShotLatch waitFirstWriteLatch = new OneShotLatch();
+
+ public static OneShotLatch unblockCancelLatch = new OneShotLatch();
+
+ volatile boolean closed = false;
+
+ @Override
+ public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
+ return new MemCheckpointStreamFactory(4 * 1024 * 1024) {
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+
+ return new MemoryCheckpointOutputStream(4 * 1024 * 1024) {
+ @Override
+ public void write(int b) throws IOException {
+ waitFirstWriteLatch.trigger();
+ try {
+ unblockCancelLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if(closed) {
+ throw new IOException("Stream closed.");
+ }
+ super.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ waitFirstWriteLatch.trigger();
+ try {
+ unblockCancelLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if(closed) {
+ throw new IOException("Stream closed.");
+ }
+ super.write(b, off, len);
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ super.close();
+ }
+ };
+ }
+ };
+ }
+ }
+
+ public static class AsyncCheckpointOperator
+ extends AbstractStreamOperator<String>
+ implements OneInputStreamOperator<String, String> {
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ // also get the state in open, this way we are sure that it was created before
+ // we trigger the test checkpoint
+ ValueState<String> state = getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new ValueStateDescriptor<>("count",
+ StringSerializer.INSTANCE, "hello"));
+
+ }
+
+ @Override
+ public void processElement(StreamRecord<String> element) throws Exception {
+ // we also don't care
+
+ ValueState<String> state = getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new ValueStateDescriptor<>("count",
+ StringSerializer.INSTANCE, "hello"));
+
+ state.update(element.getValue());
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ // not interested
+ }
+ }
+
+ public static class DummyMapFunction<T> implements MapFunction<T, T> {
+ @Override
+ public T map(T value) { return value; }
+ }
+}