You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/20 16:07:25 UTC
[2/2] flink git commit: [FLINK-3718] Add Option For Completely Async
Backup in RocksDB State Backend
[FLINK-3718] Add Option For Completely Async Backup in RocksDB State Backend
This also refactors the RocksDB backend to keep one RocksDB data base in
the backend where all key/value state is stored. Individual named
key/value states get a reference to the db and store their state in a
column family. This way, we only have to backup one RocksDB data base
and can centrally decide how to do backups.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c33325e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c33325e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c33325e3
Branch: refs/heads/master
Commit: c33325e3ffb9acecfc9c562b21a3ac703bae983d
Parents: 77f1a2e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Apr 8 14:58:50 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Apr 20 16:06:41 2016 +0200
----------------------------------------------------------------------
.../streaming/state/AbstractRocksDBState.java | 381 +---------
.../contrib/streaming/state/OptionsFactory.java | 21 +-
.../streaming/state/PredefinedOptions.java | 70 +-
.../streaming/state/RocksDBFoldingState.java | 106 +--
.../streaming/state/RocksDBListState.java | 101 +--
.../streaming/state/RocksDBReducingState.java | 102 +--
.../streaming/state/RocksDBStateBackend.java | 740 +++++++++++++++++--
.../streaming/state/RocksDBValueState.java | 101 +--
.../FullyAsyncRocksDBStateBackendTest.java | 65 ++
.../state/RocksDBAsyncKVSnapshotTest.java | 106 ++-
.../state/RocksDBStateBackendConfigTest.java | 40 +-
.../common/state/FoldingStateDescriptor.java | 31 +
.../api/common/state/ListStateDescriptor.java | 29 +
.../common/state/ReducingStateDescriptor.java | 30 +
.../flink/api/common/state/StateDescriptor.java | 23 +-
.../api/common/state/ValueStateDescriptor.java | 31 +
.../flink/hdfstests/FileStateBackendTest.java | 15 +-
.../runtime/state/AbstractStateBackend.java | 8 +-
.../runtime/state/FileStateBackendTest.java | 13 +
.../runtime/state/MemoryStateBackendTest.java | 13 +
.../runtime/state/StateBackendTestBase.java | 627 ++++++++--------
.../EventTimeWindowCheckpointingITCase.java | 16 +-
22 files changed, 1536 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 74e0509..ec58a63 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -17,195 +17,64 @@
package org.apache.flink.contrib.streaming.state;
-import org.apache.commons.io.FileUtils;
-
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
+
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
-
-import org.apache.flink.streaming.util.HDFSCopyFromLocal;
-import org.apache.flink.streaming.util.HDFSCopyToLocal;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.rocksdb.BackupEngine;
-import org.rocksdb.BackupableDBOptions;
-import org.rocksdb.Env;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RestoreOptions;
-import org.rocksdb.RocksDB;
+import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.util.UUID;
-
-import static java.util.Objects.requireNonNull;
/**
* Base class for {@link State} implementations that store state in a RocksDB database.
*
- * <p>This base class is responsible for setting up the RocksDB database, for
- * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The
- * concrete subclasses just use the RocksDB handle to store/retrieve state.
- *
- * <p>State is checkpointed asynchronously. The synchronous part is drawing the actual backup
- * from RocksDB, this is done in {@link #snapshot(long, long)}. This will return a
- * {@link AsyncRocksDBSnapshot} that will perform the copying of the backup to the remote
- * file system.
+ * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that
+ * the {@link RocksDBStateBackend} manages and checkpoints.
*
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <S> The type of {@link State}.
* @param <SD> The type of {@link StateDescriptor}.
*/
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
- implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
+abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
+ implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
- /** Serializer for the keys */
- protected final TypeSerializer<K> keySerializer;
-
/** Serializer for the namespace */
- protected final TypeSerializer<N> namespaceSerializer;
-
- /** The current key, which the next value methods will refer to */
- protected K currentKey;
+ private final TypeSerializer<N> namespaceSerializer;
/** The current namespace, which the next value methods will refer to */
- protected N currentNamespace;
-
- /** Store it so that we can clean up in dispose() */
- protected final File basePath;
+ private N currentNamespace;
- /** FileSystem path where checkpoints are stored */
- protected final String checkpointPath;
+ /** Backend that holds the actual RocksDB instance where we store state */
+ protected RocksDBStateBackend backend;
- /** Directory in "basePath" where the actual RocksDB data base instance stores its files */
- protected final File rocksDbPath;
-
- /** Our RocksDB instance */
- protected final RocksDB db;
+ /** The column family of this particular instance of state */
+ ColumnFamilyHandle columnFamily;
/**
* Creates a new RocksDB backed state.
*
- * @param keySerializer The serializer for the keys.
* @param namespaceSerializer The serializer for the namespace.
- * @param basePath The path on the local system where RocksDB data should be stored.
*/
- protected AbstractRocksDBState(
- TypeSerializer<K> keySerializer,
+ AbstractRocksDBState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
- File basePath,
- String checkpointPath,
- Options options) {
-
- rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString());
+ RocksDBStateBackend backend) {
- this.keySerializer = requireNonNull(keySerializer);
this.namespaceSerializer = namespaceSerializer;
- this.basePath = basePath;
- this.checkpointPath = checkpointPath;
-
- RocksDB.loadLibrary();
-
- if (!basePath.exists()) {
- if (!basePath.mkdirs()) {
- throw new RuntimeException("Could not create RocksDB data directory.");
- }
- }
-
- // clean it, this will remove the last part of the path but RocksDB will recreate it
- try {
- if (rocksDbPath.exists()) {
- LOG.warn("Deleting already existing db directory {}.", rocksDbPath);
- FileUtils.deleteDirectory(rocksDbPath);
- }
- } catch (IOException e) {
- throw new RuntimeException("Error cleaning RocksDB data directory.", e);
- }
-
- try {
- db = RocksDB.open(options, rocksDbPath.getAbsolutePath());
- } catch (RocksDBException e) {
- throw new RuntimeException("Error while opening RocksDB instance.", e);
- }
+ this.backend = backend;
- }
-
- /**
- * Creates a new RocksDB backed state and restores from the given backup directory. After
- * restoring the backup directory is deleted.
- *
- * @param keySerializer The serializer for the keys.
- * @param namespaceSerializer The serializer for the namespace.
- * @param basePath The path on the local system where RocksDB data should be stored.
- * @param restorePath The path to a backup directory from which to restore RocksDb database.
- */
- protected AbstractRocksDBState(
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- File basePath,
- String checkpointPath,
- String restorePath,
- Options options) {
-
- rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString());
-
- RocksDB.loadLibrary();
-
- // clean it, this will remove the last part of the path but RocksDB will recreate it
- try {
- if (rocksDbPath.exists()) {
- LOG.warn("Deleting already existing db directory {}.", rocksDbPath);
- FileUtils.deleteDirectory(rocksDbPath);
- }
- } catch (IOException e) {
- throw new RuntimeException("Error cleaning RocksDB data directory.", e);
- }
-
- try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(restorePath + "/"))) {
- backupEngine.restoreDbFromLatestBackup(rocksDbPath.getAbsolutePath(), rocksDbPath.getAbsolutePath(), new RestoreOptions(true));
- } catch (RocksDBException|IllegalArgumentException e) {
- throw new RuntimeException("Error while restoring RocksDB state from " + restorePath, e);
- } finally {
- try {
- FileUtils.deleteDirectory(new File(restorePath));
- } catch (IOException e) {
- LOG.error("Error cleaning up local restore directory " + restorePath, e);
- }
- }
-
- this.keySerializer = requireNonNull(keySerializer);
- this.namespaceSerializer = namespaceSerializer;
- this.basePath = basePath;
- this.checkpointPath = checkpointPath;
-
- if (!basePath.exists()) {
- if (!basePath.mkdirs()) {
- throw new RuntimeException("Could not create RocksDB data directory.");
- }
- }
-
- try {
- db = RocksDB.open(options, rocksDbPath.getAbsolutePath());
- } catch (RocksDBException e) {
- throw new RuntimeException("Error while opening RocksDB instance.", e);
- }
+ this.columnFamily = columnFamily;
}
// ------------------------------------------------------------------------
@@ -217,234 +86,38 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
try {
writeKeyAndNamespace(out);
byte[] key = baos.toByteArray();
- db.remove(key);
+ backend.db.remove(columnFamily, key);
} catch (IOException|RocksDBException e) {
throw new RuntimeException("Error while removing entry from RocksDB", e);
}
}
- protected void writeKeyAndNamespace(DataOutputView out) throws IOException {
- keySerializer.serialize(currentKey, out);
+ void writeKeyAndNamespace(DataOutputView out) throws IOException {
+ backend.keySerializer().serialize(backend.currentKey(), out);
out.writeByte(42);
namespaceSerializer.serialize(currentNamespace, out);
}
@Override
- final public void setCurrentKey(K currentKey) {
- this.currentKey = currentKey;
- }
-
- @Override
final public void setCurrentNamespace(N namespace) {
this.currentNamespace = namespace;
}
- protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI backupUri, long checkpointId);
-
- @Override
- public final KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(final long checkpointId, long timestamp) throws Exception {
-
- final File localBackupPath = new File(basePath, "local-chk-" + checkpointId);
- final URI backupUri = new URI(checkpointPath + "/chk-" + checkpointId);
-
-
- if (!localBackupPath.exists()) {
- if (!localBackupPath.mkdirs()) {
- throw new RuntimeException("Could not create local backup path " + localBackupPath);
- }
- }
-
- long startTime = System.currentTimeMillis();
- BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
- // we disabled the WAL
- backupOptions.setBackupLogFiles(false);
- // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot
- backupOptions.setSync(false);
- try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(),
- backupOptions)) {
- // make sure to flush because we don't write to the write-ahead-log
- db.flush(new FlushOptions().setWaitForFlush(true));
- backupEngine.createNewBackup(db);
- }
- long endTime = System.currentTimeMillis();
- LOG.info("RocksDB (" + rocksDbPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
-
- return new AsyncRocksDBSnapshot<>(
- localBackupPath,
- backupUri,
- checkpointId,
- this);
- }
-
@Override
final public void dispose() {
- db.dispose();
- try {
- FileUtils.deleteDirectory(basePath);
- } catch (IOException e) {
- throw new RuntimeException("Error disposing RocksDB data directory.", e);
- }
+ // ignore because we don't hold any state ourselves
}
- protected static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>>
- implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
-
- // ------------------------------------------------------------------------
- // Ctor parameters for RocksDB state
- // ------------------------------------------------------------------------
-
- /** Store it so that we can clean up in dispose() */
- protected final File basePath;
-
- /** Where we should put RocksDB backups */
- protected final String checkpointPath;
-
- // ------------------------------------------------------------------------
- // Info about this checkpoint
- // ------------------------------------------------------------------------
-
- protected final URI backupUri;
-
- protected long checkpointId;
-
- // ------------------------------------------------------------------------
- // For sanity checks
- // ------------------------------------------------------------------------
-
- /** Key serializer */
- protected final TypeSerializer<K> keySerializer;
-
- /** Namespace serializer */
- protected final TypeSerializer<N> namespaceSerializer;
-
- /** Hash of the StateDescriptor, for sanity checks */
- protected final SD stateDesc;
-
- /**
- * Creates a new snapshot from the given state parameters.
- */
- public AbstractRocksDBSnapshot(File basePath,
- String checkpointPath,
- URI backupUri,
- long checkpointId,
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- SD stateDesc) {
-
- this.basePath = basePath;
- this.checkpointPath = checkpointPath;
- this.backupUri = backupUri;
- this.checkpointId = checkpointId;
-
- this.stateDesc = stateDesc;
- this.keySerializer = keySerializer;
- this.namespaceSerializer = namespaceSerializer;
- }
-
- /**
- * Subclasses must implement this for creating a concrete RocksDB state.
- */
- protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState(
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- SD stateDesc,
- File basePath,
- String backupPath,
- String restorePath,
- Options options) throws Exception;
-
- @Override
- public final KvState<K, N, S, SD, RocksDBStateBackend> restoreState(
- RocksDBStateBackend stateBackend,
- TypeSerializer<K> keySerializer,
- ClassLoader classLoader,
- long recoveryTimestamp) throws Exception {
-
- // validity checks
- if (!this.keySerializer.equals(keySerializer)) {
- throw new IllegalArgumentException(
- "Cannot restore the state from the snapshot with the given serializers. " +
- "State (K/V) was serialized with " +
- "(" + keySerializer + ") " +
- "now is (" + keySerializer + ")");
- }
-
- if (!basePath.exists()) {
- if (!basePath.mkdirs()) {
- throw new RuntimeException("Could not create RocksDB base path " + basePath);
- }
- }
-
- final File localBackupPath = new File(basePath, "chk-" + checkpointId);
-
- if (localBackupPath.exists()) {
- try {
- LOG.warn("Deleting already existing local backup directory {}.", localBackupPath);
- FileUtils.deleteDirectory(localBackupPath);
- } catch (IOException e) {
- throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
- }
- }
-
- HDFSCopyToLocal.copyToLocal(backupUri, basePath);
- return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, basePath,
- checkpointPath, localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions());
- }
-
- @Override
- public final void discardState() throws Exception {
- FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
- fs.delete(new Path(backupUri), true);
- }
+ @Override
+ public void setCurrentKey(K key) {
+ // ignore because we don't hold any state ourselves
- @Override
- public final long getStateSize() throws Exception {
- FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
- return fs.getContentSummary(new Path(backupUri)).getLength();
- }
}
- /**
- * Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is
- * copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()}
- * of this class.
- */
- private static class AsyncRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>> extends AsynchronousKvStateSnapshot<K, N, S, SD, RocksDBStateBackend> {
- private static final long serialVersionUID = 1L;
- private final File localBackupPath;
- private final URI backupUri;
- private final long checkpointId;
- private transient AbstractRocksDBState<K, N, S, SD> state;
-
- public AsyncRocksDBSnapshot(File localBackupPath,
- URI backupUri,
- long checkpointId,
- AbstractRocksDBState<K, N, S, SD> state) {
- this.localBackupPath = localBackupPath;
- this.backupUri = backupUri;
- this.checkpointId = checkpointId;
- this.state = state;
- }
-
- @Override
- public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> materialize() throws Exception {
- try {
- long startTime = System.currentTimeMillis();
- HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
- long endTime = System.currentTimeMillis();
- LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
- return state.createRocksDBSnapshot(backupUri, checkpointId);
- } catch (Exception e) {
- FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
- fs.delete(new Path(backupUri), true);
- throw e;
- } finally {
- FileUtils.deleteQuietly(localBackupPath);
- }
- }
+ @Override
+ public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,
+ long timestamp) throws Exception {
+ throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 3e52f1f..863c5da 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -18,10 +18,11 @@
package org.apache.flink.contrib.streaming.state;
-import org.rocksdb.Options;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
/**
- * A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}.
+ * A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}.
* Options have to be created lazily by this factory, because the {@code Options}
* class is not serializable and holds pointers to native code.
*
@@ -55,5 +56,19 @@ public interface OptionsFactory extends java.io.Serializable {
* @param currentOptions The options object with the pre-defined options.
* @return The options object on which the additional options are set.
*/
- Options createOptions(Options currentOptions);
+ DBOptions createDBOptions(DBOptions currentOptions);
+
+ /**
+ * This method should set the additional options on top of the current options object.
+ * The current options object may contain pre-defined options based on flags that have
+ * been configured on the state backend.
+ *
+ * <p>It is important to set the options on the current object and return the result from
+ * the setter methods, otherwise the pre-defined options may get lost.
+ *
+ * @param currentOptions The options object with the pre-defined options.
+ * @return The options object on which the additional options are set.
+ */
+ ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions);
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index c19b54f..93aac85 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -19,8 +19,10 @@
package org.apache.flink.contrib.streaming.state;
import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
-import org.rocksdb.Options;
+import org.rocksdb.DBOptions;
+import org.rocksdb.StringAppendOperator;
/**
* The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
@@ -42,11 +44,18 @@ public enum PredefinedOptions {
DEFAULT {
@Override
- public Options createOptions() {
- return new Options()
+ public DBOptions createDBOptions() {
+ return new DBOptions()
.setUseFsync(false)
.setDisableDataSync(true);
}
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions() {
+ return new ColumnFamilyOptions()
+ .setMergeOperator(new StringAppendOperator());
+ }
+
},
/**
@@ -72,16 +81,22 @@ public enum PredefinedOptions {
SPINNING_DISK_OPTIMIZED {
@Override
- public Options createOptions() {
+ public DBOptions createDBOptions() {
- return new Options()
- .setCompactionStyle(CompactionStyle.LEVEL)
- .setLevelCompactionDynamicLevelBytes(true)
+ return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1);
}
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions() {
+ return new ColumnFamilyOptions()
+ .setMergeOperator(new StringAppendOperator())
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setLevelCompactionDynamicLevelBytes(true);
+ }
},
/**
@@ -113,25 +128,32 @@ public enum PredefinedOptions {
SPINNING_DISK_OPTIMIZED_HIGH_MEM {
@Override
- public Options createOptions() {
+ public DBOptions createDBOptions() {
+
+ return new DBOptions()
+ .setIncreaseParallelism(4)
+ .setUseFsync(false)
+ .setDisableDataSync(true)
+ .setMaxOpenFiles(-1);
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions() {
final long blockCacheSize = 256 * 1024 * 1024;
final long blockSize = 128 * 1024;
final long targetFileSize = 256 * 1024 * 1024;
final long writeBufferSize = 64 * 1024 * 1024;
- return new Options()
+ return new ColumnFamilyOptions()
+ .setMergeOperator(new StringAppendOperator())
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setTargetFileSizeBase(targetFileSize)
.setMaxBytesForLevelBase(4 * targetFileSize)
.setWriteBufferSize(writeBufferSize)
- .setIncreaseParallelism(4)
.setMinWriteBufferNumberToMerge(3)
.setMaxWriteBufferNumber(4)
- .setUseFsync(false)
- .setDisableDataSync(true)
- .setMaxOpenFiles(-1)
.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(blockCacheSize)
@@ -160,21 +182,35 @@ public enum PredefinedOptions {
FLASH_SSD_OPTIMIZED {
@Override
- public Options createOptions() {
- return new Options()
+ public DBOptions createDBOptions() {
+ return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1);
}
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions() {
+ return new ColumnFamilyOptions()
+ .setMergeOperator(new StringAppendOperator());
+ }
};
// ------------------------------------------------------------------------
/**
- * Creates the options for this pre-defined setting.
+ * Creates the {@link DBOptions}for this pre-defined setting.
*
* @return The pre-defined options object.
*/
- public abstract Options createOptions();
+ public abstract DBOptions createDBOptions();
+
+ /**
+ * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this pre-defined setting.
+ *
+ * @return The pre-defined options object.
+ */
+ public abstract ColumnFamilyOptions createColumnOptions();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 20b5181..91fa807 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -24,17 +24,14 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KvState;
-import org.rocksdb.Options;
+import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
import static java.util.Objects.requireNonNull;
@@ -46,7 +43,7 @@ import static java.util.Objects.requireNonNull;
* @param <T> The type of the values that can be folded into the state.
* @param <ACC> The type of the value in the folding state.
*/
-public class RocksDBFoldingState<K, N, T, ACC>
+class RocksDBFoldingState<K, N, T, ACC>
extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
implements FoldingState<T, ACC> {
@@ -54,7 +51,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
private final TypeSerializer<ACC> valueSerializer;
/** This holds the name of the state and can create an initial default value for the state. */
- protected final FoldingStateDescriptor<T, ACC> stateDesc;
+ private final FoldingStateDescriptor<T, ACC> stateDesc;
/** User-specified fold function */
private final FoldFunction<T, ACC> foldFunction;
@@ -63,59 +60,23 @@ public class RocksDBFoldingState<K, N, T, ACC>
* We disable writes to the write-ahead-log here. We can't have these in the base class
* because JNI segfaults for some reason if they are.
*/
- protected final WriteOptions writeOptions;
+ private final WriteOptions writeOptions;
/**
* Creates a new {@code RocksDBFoldingState}.
*
- * @param keySerializer The serializer for the keys.
* @param namespaceSerializer The serializer for the namespace.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
- * @param dbPath The path on the local system where RocksDB data should be stored.
- * @param backupPath The path where to store backups.
*/
- protected RocksDBFoldingState(
- TypeSerializer<K> keySerializer,
+ RocksDBFoldingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc,
- File dbPath,
- String backupPath,
- Options options) {
-
- super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
-
- this.stateDesc = requireNonNull(stateDesc);
- this.valueSerializer = stateDesc.getSerializer();
- this.foldFunction = stateDesc.getFoldFunction();
-
- writeOptions = new WriteOptions();
- writeOptions.setDisableWAL(true);
- }
+ RocksDBStateBackend backend) {
- /**
- * Creates a {@code RocksDBFoldingState} by restoring from a directory.
- *
- * @param keySerializer The serializer for the keys.
- * @param namespaceSerializer The serializer for the namespace.
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param dbPath The path on the local system where RocksDB data should be stored.
- * @param backupPath The path where to store backups.
- * @param restorePath The path on the local file system that we are restoring from.
- */
- protected RocksDBFoldingState(
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc,
- File dbPath,
- String backupPath,
- String restorePath,
- Options options) {
-
- super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
+ super(columnFamily, namespaceSerializer, backend);
- this.stateDesc = stateDesc;
+ this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
this.foldFunction = stateDesc.getFoldFunction();
@@ -130,7 +91,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
try {
writeKeyAndNamespace(out);
byte[] key = baos.toByteArray();
- byte[] valueBytes = db.get(key);
+ byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return stateDesc.getDefaultValue();
}
@@ -147,65 +108,22 @@ public class RocksDBFoldingState<K, N, T, ACC>
try {
writeKeyAndNamespace(out);
byte[] key = baos.toByteArray();
- byte[] valueBytes = db.get(key);
+ byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
baos.reset();
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
- db.put(writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
} else {
ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
ACC newValue = foldFunction.fold(oldValue, value);
baos.reset();
valueSerializer.serialize(newValue, out);
- db.put(writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
}
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
-
- @Override
- protected AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> createRocksDBSnapshot(
- URI backupUri, long checkpointId) {
-
- return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
- }
-
- private static class Snapshot<K, N, T, ACC> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
- private static final long serialVersionUID = 1L;
-
- public Snapshot(
- File dbPath,
- String checkpointPath,
- URI backupUri,
- long checkpointId,
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) {
-
- super(dbPath,
- checkpointPath,
- backupUri,
- checkpointId,
- keySerializer,
- namespaceSerializer,
- stateDesc);
- }
-
- @Override
- protected KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, RocksDBStateBackend>
- createRocksDBState(
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc,
- File dbPath,
- String backupPath,
- String restorePath,
- Options options) throws Exception {
-
- return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath, options);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index f5c589c..dc55d11 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -23,17 +23,14 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KvState;
-import org.rocksdb.Options;
+import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -43,11 +40,15 @@ import static java.util.Objects.requireNonNull;
/**
* {@link ListState} implementation that stores state in RocksDB.
*
+ * <p>{@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
+ * we use the {@code merge()} call.
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of the values in the list state.
*/
-public class RocksDBListState<K, N, V>
+class RocksDBListState<K, N, V>
extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>>
implements ListState<V> {
@@ -55,59 +56,27 @@ public class RocksDBListState<K, N, V>
private final TypeSerializer<V> valueSerializer;
/** This holds the name of the state and can create an initial default value for the state. */
- protected final ListStateDescriptor<V> stateDesc;
+ private final ListStateDescriptor<V> stateDesc;
/**
* We disable writes to the write-ahead-log here. We can't have these in the base class
* because JNI segfaults for some reason if they are.
*/
- protected final WriteOptions writeOptions;
+ private final WriteOptions writeOptions;
/**
* Creates a new {@code RocksDBListState}.
*
- * @param keySerializer The serializer for the keys.
- * @param namespaceSerializer The serializer for the namespace.
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param dbPath The path on the local system where RocksDB data should be stored.
- * @param backupPath The path where to store backups.
- */
- protected RocksDBListState(TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<V> stateDesc,
- File dbPath,
- String backupPath,
- Options options) {
-
- super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
- this.stateDesc = requireNonNull(stateDesc);
- this.valueSerializer = stateDesc.getSerializer();
-
- writeOptions = new WriteOptions();
- writeOptions.setDisableWAL(true);
- }
-
- /**
- * Creates a {@code RocksDBListState} by restoring from a directory.
- *
- * @param keySerializer The serializer for the keys.
* @param namespaceSerializer The serializer for the namespace.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
- * @param dbPath The path on the local system where RocksDB data should be stored.
- * @param backupPath The path where to store backups.
- * @param restorePath The path on the local file system that we are restoring from.
*/
- protected RocksDBListState(TypeSerializer<K> keySerializer,
+ RocksDBListState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
- File dbPath,
- String backupPath,
- String restorePath,
- Options options) {
+ RocksDBStateBackend backend) {
- super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
+ super(columnFamily, namespaceSerializer, backend);
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
@@ -122,7 +91,7 @@ public class RocksDBListState<K, N, V>
try {
writeKeyAndNamespace(out);
byte[] key = baos.toByteArray();
- byte[] valueBytes = db.get(key);
+ byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return Collections.emptyList();
@@ -155,55 +124,11 @@ public class RocksDBListState<K, N, V>
baos.reset();
valueSerializer.serialize(value, out);
- db.merge(writeOptions, key, baos.toByteArray());
+ backend.db.merge(columnFamily, writeOptions, key, baos.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
-
- @Override
- protected AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>> createRocksDBSnapshot(
- URI backupUri,
- long checkpointId) {
-
- return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
- }
-
- private static class Snapshot<K, N, V> extends
- AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>>
- {
- private static final long serialVersionUID = 1L;
-
- public Snapshot(File dbPath,
- String checkpointPath,
- URI backupUri,
- long checkpointId,
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<V> stateDesc) {
- super(dbPath,
- checkpointPath,
- backupUri,
- checkpointId,
- keySerializer,
- namespaceSerializer,
- stateDesc);
- }
-
- @Override
- protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<V> stateDesc,
- File basePath,
- String backupPath,
- String restorePath,
- Options options) throws Exception {
-
- return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, basePath,
- checkpointPath, restorePath, options);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 6a965cc..953660d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -24,17 +24,14 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KvState;
-import org.rocksdb.Options;
+import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
import static java.util.Objects.requireNonNull;
@@ -45,7 +42,7 @@ import static java.util.Objects.requireNonNull;
* @param <N> The type of the namespace.
* @param <V> The type of value that the state state stores.
*/
-public class RocksDBReducingState<K, N, V>
+class RocksDBReducingState<K, N, V>
extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
implements ReducingState<V> {
@@ -53,7 +50,7 @@ public class RocksDBReducingState<K, N, V>
private final TypeSerializer<V> valueSerializer;
/** This holds the name of the state and can create an initial default value for the state. */
- protected final ReducingStateDescriptor<V> stateDesc;
+ private final ReducingStateDescriptor<V> stateDesc;
/** User-specified reduce function */
private final ReduceFunction<V> reduceFunction;
@@ -62,26 +59,21 @@ public class RocksDBReducingState<K, N, V>
* We disable writes to the write-ahead-log here. We can't have these in the base class
* because JNI segfaults for some reason if they are.
*/
- protected final WriteOptions writeOptions;
+ private final WriteOptions writeOptions;
/**
* Creates a new {@code RocksDBReducingState}.
*
- * @param keySerializer The serializer for the keys.
* @param namespaceSerializer The serializer for the namespace.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
- * @param dbPath The path on the local system where RocksDB data should be stored.
- * @param backupPath The path where to store backups.
*/
- protected RocksDBReducingState(TypeSerializer<K> keySerializer,
+ RocksDBReducingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
- File dbPath,
- String backupPath,
- Options options) {
+ RocksDBStateBackend backend) {
- super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
+ super(columnFamily, namespaceSerializer, backend);
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
this.reduceFunction = stateDesc.getReduceFunction();
@@ -90,34 +82,6 @@ public class RocksDBReducingState<K, N, V>
writeOptions.setDisableWAL(true);
}
- /**
- * Creates a {@code RocksDBReducingState} by restoring from a directory.
- *
- * @param keySerializer The serializer for the keys.
- * @param namespaceSerializer The serializer for the namespace.
- * @param stateDesc The state identifier for the state. This contains name
- * and can create a default state value.
- * @param dbPath The path on the local system where RocksDB data should be stored.
- * @param backupPath The path where to store backups.
- * @param restorePath The path on the local file system that we are restoring from.
- */
- protected RocksDBReducingState(TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<V> stateDesc,
- File dbPath,
- String backupPath,
- String restorePath,
- Options options) {
-
- super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
- this.stateDesc = stateDesc;
- this.valueSerializer = stateDesc.getSerializer();
- this.reduceFunction = stateDesc.getReduceFunction();
-
- writeOptions = new WriteOptions();
- writeOptions.setDisableWAL(true);
- }
-
@Override
public V get() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -125,7 +89,7 @@ public class RocksDBReducingState<K, N, V>
try {
writeKeyAndNamespace(out);
byte[] key = baos.toByteArray();
- byte[] valueBytes = db.get(key);
+ byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return null;
}
@@ -142,66 +106,22 @@ public class RocksDBReducingState<K, N, V>
try {
writeKeyAndNamespace(out);
byte[] key = baos.toByteArray();
- byte[] valueBytes = db.get(key);
+ byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
baos.reset();
valueSerializer.serialize(value, out);
- db.put(writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
} else {
V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
V newValue = reduceFunction.reduce(oldValue, value);
baos.reset();
valueSerializer.serialize(newValue, out);
- db.put(writeOptions, key, baos.toByteArray());
+ backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
}
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
-
- @Override
- protected AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>> createRocksDBSnapshot(
- URI backupUri,
- long checkpointId) {
-
- return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
- }
-
- private static class Snapshot<K, N, V> extends
- AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
- {
- private static final long serialVersionUID = 1L;
-
- public Snapshot(File dbPath,
- String checkpointPath,
- URI backupUri,
- long checkpointId,
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<V> stateDesc) {
- super(dbPath,
- checkpointPath,
- backupUri,
- checkpointId,
- keySerializer,
- namespaceSerializer,
- stateDesc);
- }
-
- @Override
- protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
- TypeSerializer<K> keySerializer,
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<V> stateDesc,
- File basePath,
- String backupPath,
- String restorePath,
- Options options) throws Exception {
-
- return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc,
- basePath, checkpointPath, restorePath, options);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 8f846da..3d63bd2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -17,15 +17,21 @@
package org.apache.flink.contrib.streaming.state;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -33,18 +39,40 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.api.common.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.rocksdb.Options;
-import org.rocksdb.StringAppendOperator;
+import org.apache.flink.streaming.util.HDFSCopyFromLocal;
+import org.apache.flink.streaming.util.HDFSCopyToLocal;
+import org.apache.hadoop.fs.FileSystem;
+import org.rocksdb.BackupEngine;
+import org.rocksdb.BackupableDBOptions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RestoreOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +95,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
-
+
+ // ------------------------------------------------------------------------
+ // Static configuration values
+ // ------------------------------------------------------------------------
/** The checkpoint directory that we copy the RocksDB backups to. */
private final Path checkpointDirectory;
@@ -75,6 +106,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
/** The state backend that stores the non-partitioned state */
private final AbstractStateBackend nonPartitionedStateBackend;
+ /** Whether we do snapshots fully asynchronous */
+ private boolean fullyAsyncBackup = false;
+
/** Operator identifier that is used to uniqueify the RocksDB storage path. */
private String operatorIdentifier;
@@ -100,8 +134,35 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private OptionsFactory optionsFactory;
/** The options from the options factory, cached */
- private transient Options rocksDbOptions;
-
+ private transient DBOptions dbOptions;
+ private transient ColumnFamilyOptions columnOptions;
+
+ // ------------------------------------------------------------------------
+ // Per operator values that are set in initializerForJob
+ // ------------------------------------------------------------------------
+
+ /** Path where this configured instance stores its data directory */
+ private transient File instanceBasePath;
+
+ /** Path where this configured instance stores its RocksDB data base */
+ private transient File instanceRocksDBPath;
+
+ /** Base path where this configured instance stores checkpoints */
+ private transient String instanceCheckpointPath;
+
+ /**
+ * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
+ * to store state. The different k/v states that we have don't each have their own RocksDB
+ * instance. They all write to this instance but to their own column family.
+ */
+ transient RocksDB db;
+
+ /**
+ * Information about the k/v states as we create them. This is used to retrieve the
+ * column family that is used for a state and also for sanity checks when restoring.
+ */
+ private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;
+
// ------------------------------------------------------------------------
/**
@@ -141,15 +202,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
- public RocksDBStateBackend(
- String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
-
+ public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend);
}
- public RocksDBStateBackend(
- URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
-
+ public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
this.nonPartitionedStateBackend = requireNonNull(nonPartitionedStateBackend);
this.checkpointDirectory = FsStateBackend.validateAndNormalizeUri(checkpointDataUri);
}
@@ -201,6 +258,40 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
+
+ instanceBasePath = new File(getDbPath("dummy_state"), UUID.randomUUID().toString());
+ instanceCheckpointPath = getCheckpointPath("dummy_state");
+ instanceRocksDBPath = new File(instanceBasePath, "db");
+
+ RocksDB.loadLibrary();
+
+ if (!instanceBasePath.exists()) {
+ if (!instanceBasePath.mkdirs()) {
+ throw new RuntimeException("Could not create RocksDB data directory.");
+ }
+ }
+
+ // clean it, this will remove the last part of the path but RocksDB will recreate it
+ try {
+ if (instanceRocksDBPath.exists()) {
+ LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
+ FileUtils.deleteDirectory(instanceRocksDBPath);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error cleaning RocksDB data directory.", e);
+ }
+
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
+ // RocksDB seems to need this...
+ columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+ List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
+ try {
+ db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
+ } catch (RocksDBException e) {
+ throw new RuntimeException("Error while opening RocksDB instance.", e);
+ }
+
+ kvStateInformation = new HashMap<>();
}
@Override
@@ -209,29 +300,43 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
@Override
+ public void dispose() {
+ super.dispose();
+ nonPartitionedStateBackend.dispose();
+
+ if (this.dbOptions != null) {
+ this.dbOptions.dispose();
+ this.dbOptions = null;
+ }
+ for (Tuple2<ColumnFamilyHandle, StateDescriptor> column: kvStateInformation.values()) {
+ column.f0.dispose();
+ }
+ db.dispose();
+ }
+
+ @Override
public void close() throws Exception {
nonPartitionedStateBackend.close();
- Options opt = this.rocksDbOptions;
- if (opt != null) {
- opt.dispose();
- this.rocksDbOptions = null;
+ if (this.dbOptions != null) {
+ this.dbOptions.dispose();
+ this.dbOptions = null;
}
+ for (Tuple2<ColumnFamilyHandle, StateDescriptor> column: kvStateInformation.values()) {
+ column.f0.dispose();
+ }
+ db.dispose();
}
- File getDbPath(String stateName) {
+ private File getDbPath(String stateName) {
return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName);
}
- String getCheckpointPath(String stateName) {
+ private String getCheckpointPath(String stateName) {
return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName;
}
-
- File[] getStoragePaths() {
- return initializedDbBasePaths;
- }
-
- File getNextStoragePath() {
+
+ private File getNextStoragePath() {
int ni = nextDirectory + 1;
ni = ni >= initializedDbBasePaths.length ? 0 : ni;
nextDirectory = ni;
@@ -239,53 +344,545 @@ public class RocksDBStateBackend extends AbstractStateBackend {
return initializedDbBasePaths[ni];
}
+ /**
+ * Visible for tests.
+ */
+ public File[] getStoragePaths() {
+ return initializedDbBasePaths;
+ }
+
+ // ------------------------------------------------------------------------
+ // Snapshot and restore
+ // ------------------------------------------------------------------------
+
+ @Override
+ public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
+ if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
+ return new HashMap<>();
+ }
+
+ if (fullyAsyncBackup) {
+ return performFullyAsyncSnapshot(checkpointId, timestamp);
+ } else {
+ return performSemiAsyncSnapshot(checkpointId, timestamp);
+ }
+ }
+
+ /**
+ * Performs a checkpoint by using the RocksDB backup feature to backup to a directory.
+ * This backup is the asynchronously copied to the final checkpoint location.
+ */
+ private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
+ // We don't snapshot individual k/v states since everything is stored in a central
+ // RocksDB data base. Create a dummy KvStateSnapshot that holds the information about
+ // that checkpoint. We use the in injectKeyValueStateSnapshots to restore.
+
+ final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId);
+ final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
+
+ if (!localBackupPath.exists()) {
+ if (!localBackupPath.mkdirs()) {
+ throw new RuntimeException("Could not create local backup path " + localBackupPath);
+ }
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
+ // we disabled the WAL
+ backupOptions.setBackupLogFiles(false);
+ // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot
+ backupOptions.setSync(false);
+
+ try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) {
+ // wait before flush with "true"
+ backupEngine.createNewBackup(db, true);
+ }
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
+
+ // draw a copy in case it get's changed while performing the async snapshot
+ List<StateDescriptor> kvStateInformationCopy = new ArrayList<>();
+ for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) {
+ kvStateInformationCopy.add(state.f1);
+ }
+ SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath,
+ backupUri,
+ kvStateInformationCopy,
+ checkpointId);
+
+
+ HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
+ result.put("dummy_state", dummySnapshot);
+ return result;
+ }
+
+ /**
+ * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then
+ * iterating over all key/value pairs in RocksDB to store them in the final checkpoint
+ * location. The only synchronous part is the drawing of the {@code Snapshot} which
+ * is essentially free.
+ */
+ private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
+ // we draw a snapshot from RocksDB then iterate over all keys at that point
+ // and store them in the backup location
+
+ final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
+
+ long startTime = System.currentTimeMillis();
+
+ org.rocksdb.Snapshot snapshot = db.getSnapshot();
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
+
+ // draw a copy in case it get's changed while performing the async snapshot
+ Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>();
+ columnFamiliesCopy.putAll(kvStateInformation);
+ FullyAsynSnapshot dummySnapshot = new FullyAsynSnapshot(db,
+ snapshot,
+ this,
+ backupUri,
+ columnFamiliesCopy,
+ checkpointId);
+
+
+ HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
+ result.put("dummy_state", dummySnapshot);
+ return result;
+ }
+
+ @Override
+ public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws Exception {
+ if (keyValueStateSnapshots.size() == 0) {
+ return;
+ }
+
+ KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");
+ if (dummyState instanceof FinalSemiAsyncSnapshot) {
+ restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState);
+ } else if (dummyState instanceof FinalFullyAsyncSnapshot) {
+ restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState);
+ } else {
+ throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);
+ }
+ }
+
+ private void restoreFromSemiAsyncSnapshot(FinalSemiAsyncSnapshot snapshot) throws Exception {
+ // This does mostly the same work as initializeForJob, we remove the existing RocksDB
+ // directory and create a new one from the backup.
+ // This must be refactored. The StateBackend should either be initialized from
+ // scratch or from a snapshot.
+
+ if (!instanceBasePath.exists()) {
+ if (!instanceBasePath.mkdirs()) {
+ throw new RuntimeException("Could not create RocksDB data directory.");
+ }
+ }
+
+ db.dispose();
+
+ // clean it, this will remove the last part of the path but RocksDB will recreate it
+ try {
+ if (instanceRocksDBPath.exists()) {
+ LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
+ FileUtils.deleteDirectory(instanceRocksDBPath);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error cleaning RocksDB data directory.", e);
+ }
+
+ final File localBackupPath = new File(instanceBasePath, "chk-" + snapshot.checkpointId);
+
+ if (localBackupPath.exists()) {
+ try {
+ LOG.warn("Deleting already existing local backup directory {}.", localBackupPath);
+ FileUtils.deleteDirectory(localBackupPath);
+ } catch (IOException e) {
+ throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
+ }
+ }
+
+ HDFSCopyToLocal.copyToLocal(snapshot.backupUri, instanceBasePath);
+
+ try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(localBackupPath.getAbsolutePath()))) {
+ backupEngine.restoreDbFromLatestBackup(instanceRocksDBPath.getAbsolutePath(), instanceRocksDBPath.getAbsolutePath(), new RestoreOptions(true));
+ } catch (RocksDBException|IllegalArgumentException e) {
+ throw new RuntimeException("Error while restoring RocksDB state from " + localBackupPath, e);
+ } finally {
+ try {
+ FileUtils.deleteDirectory(localBackupPath);
+ } catch (IOException e) {
+ LOG.error("Error cleaning up local restore directory " + localBackupPath, e);
+ }
+ }
+
+
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(snapshot.stateDescriptors.size());
+ for (StateDescriptor stateDescriptor: snapshot.stateDescriptors) {
+ columnFamilyDescriptors.add(new ColumnFamilyDescriptor(stateDescriptor.getName().getBytes(), getColumnOptions()));
+ }
+
+ // RocksDB seems to need this...
+ columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+ List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(snapshot.stateDescriptors.size());
+ try {
+
+ db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
+ this.kvStateInformation = new HashMap<>();
+ for (int i = 0; i < snapshot.stateDescriptors.size(); i++) {
+ this.kvStateInformation.put(snapshot.stateDescriptors.get(i).getName(), new Tuple2<>(columnFamilyHandles.get(i), snapshot.stateDescriptors.get(i)));
+ }
+
+ } catch (RocksDBException e) {
+ throw new RuntimeException("Error while opening RocksDB instance.", e);
+ }
+ }
+
+ private void restoreFromFullyAsyncSnapshot(FinalFullyAsyncSnapshot snapshot) throws Exception {
+
+ DataInputView inputView = snapshot.stateHandle.getState(userCodeClassLoader);
+
+ // clear k/v state information before filling it
+ kvStateInformation.clear();
+
+ // first get the column family mapping
+ int numColumns = inputView.readInt();
+ Map<Byte, StateDescriptor> columnFamilyMapping = new HashMap<>(numColumns);
+ for (int i = 0; i < numColumns; i++) {
+ byte mappingByte = inputView.readByte();
+
+ ObjectInputStream ooIn = new ObjectInputStream(new DataInputViewStream(inputView));
+ StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+
+ columnFamilyMapping.put(mappingByte, stateDescriptor);
+
+ // this will fill in the k/v state information
+ getColumnFamily(stateDescriptor);
+ }
+
+ // try and read until EOF
+ try {
+ // the EOFException will get us out of this...
+ while (true) {
+ byte mappingByte = inputView.readByte();
+ ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte));
+ byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
+ byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
+ db.put(handle, key, value);
+ }
+ } catch (EOFException e) {
+ // expected
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Semi-asynchronous Backup Classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is
+ * copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()}.
+ */
+ private static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
+ private static final long serialVersionUID = 1L;
+ private final File localBackupPath;
+ private final URI backupUri;
+ private final List<StateDescriptor> stateDescriptors;
+ private final long checkpointId;
+
+ private SemiAsyncSnapshot(File localBackupPath,
+ URI backupUri,
+ List<StateDescriptor> columnFamilies,
+ long checkpointId) {
+ this.localBackupPath = localBackupPath;
+ this.backupUri = backupUri;
+ this.stateDescriptors = columnFamilies;
+ this.checkpointId = checkpointId;
+ }
+
+ @Override
+ public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
+ try {
+ long startTime = System.currentTimeMillis();
+ HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
+ long endTime = System.currentTimeMillis();
+ LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
+ return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
+ } catch (Exception e) {
+ FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+ fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
+ throw e;
+ } finally {
+ FileUtils.deleteQuietly(localBackupPath);
+ }
+ }
+ }
+
+ /**
+ * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This
+ * also stores the column families that we had at the time of the snapshot so that we can
+ * restore these. This results from {@link SemiAsyncSnapshot}.
+ */
+ private static class FinalSemiAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
+ private static final long serialVersionUID = 1L;
+
+ final URI backupUri;
+ final long checkpointId;
+ private final List<StateDescriptor> stateDescriptors;
+
+ /**
+ * Creates a new snapshot from the given state parameters.
+ */
+ private FinalSemiAsyncSnapshot(URI backupUri, long checkpointId, List<StateDescriptor> stateDescriptors) {
+ this.backupUri = backupUri;
+ this.checkpointId = checkpointId;
+ this.stateDescriptors = stateDescriptors;
+ }
+
+ @Override
+ public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
+ RocksDBStateBackend stateBackend,
+ TypeSerializer<Object> keySerializer,
+ ClassLoader classLoader,
+ long recoveryTimestamp) throws Exception {
+ throw new RuntimeException("Should never happen.");
+ }
+
+ @Override
+ public final void discardState() throws Exception {
+ FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+ fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
+ }
+
+ @Override
+ public final long getStateSize() throws Exception {
+ FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+ return fs.getContentSummary(new org.apache.hadoop.fs.Path(backupUri)).getLength();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Fully asynchronous Backup Classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * This does the snapshot using a RocksDB snapshot and an iterator over all keys
+ * at the point of that snapshot.
+ */
+ private static class FullyAsynSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
+ private static final long serialVersionUID = 1L;
+
+ private transient final RocksDB db;
+ private transient org.rocksdb.Snapshot snapshot;
+ private transient AbstractStateBackend backend;
+
+ private final URI backupUri;
+ private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies;
+ private final long checkpointId;
+
+ private FullyAsynSnapshot(RocksDB db,
+ org.rocksdb.Snapshot snapshot,
+ AbstractStateBackend backend,
+ URI backupUri,
+ Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies,
+ long checkpointId) {
+ this.db = db;
+ this.snapshot = snapshot;
+ this.backend = backend;
+ this.backupUri = backupUri;
+ this.columnFamilies = columnFamilies;
+ this.checkpointId = checkpointId;
+ }
+
+ @Override
+ public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
+ try {
+ long startTime = System.currentTimeMillis();
+
+ CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
+
+ outputView.writeInt(columnFamilies.size());
+
+ // we don't know how many key/value pairs there are in each column family.
+ // We prefix every written element with a byte that signifies to which
+ // column family it belongs, this way we can restore the column families
+ byte count = 0;
+ Map<String, Byte> columnFamilyMapping = new HashMap<>();
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
+ columnFamilyMapping.put(column.getKey(), count);
+
+ outputView.writeByte(count);
+
+ ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
+ ooOut.writeObject(column.getValue().f1);
+ ooOut.flush();
+
+ count++;
+ }
+
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
+ byte columnByte = columnFamilyMapping.get(column.getKey());
+ ReadOptions readOptions = new ReadOptions();
+ readOptions.setSnapshot(snapshot);
+ RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
+ iterator.seekToFirst();
+ while (iterator.isValid()) {
+ outputView.writeByte(columnByte);
+ BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), outputView);
+ BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), outputView);
+ iterator.next();
+ }
+ }
+
+ StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
+ return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
+ } finally {
+ db.releaseSnapshot(snapshot);
+ snapshot = null;
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (snapshot != null) {
+ db.releaseSnapshot(snapshot);
+ }
+ super.finalize();
+ }
+ }
+
+ /**
+ * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This
+ * results from {@link FullyAsynSnapshot}.
+ */
+ private static class FinalFullyAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
+ private static final long serialVersionUID = 1L;
+
+ final StateHandle<DataInputView> stateHandle;
+ final long checkpointId;
+
+ /**
+ * Creates a new snapshot from the given state parameters.
+ */
+ private FinalFullyAsyncSnapshot(StateHandle<DataInputView> stateHandle, long checkpointId) {
+ this.stateHandle = stateHandle;
+ this.checkpointId = checkpointId;
+ }
+
+ @Override
+ public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
+ RocksDBStateBackend stateBackend,
+ TypeSerializer<Object> keySerializer,
+ ClassLoader classLoader,
+ long recoveryTimestamp) throws Exception {
+ throw new RuntimeException("Should never happen.");
+ }
+
+ @Override
+ public final void discardState() throws Exception {
+ stateHandle.discardState();
+ }
+
+ @Override
+ public final long getStateSize() throws Exception {
+ return stateHandle.getStateSize();
+ }
+ }
+
// ------------------------------------------------------------------------
// State factories
// ------------------------------------------------------------------------
-
+
+ /**
+ * Creates a column family handle for use with a k/v state. When restoring from a snapshot
+ * we don't restore the individual k/v states, just the global RocksDB data base and the
+ * list of column families. When a k/v state is first requested we check here whether we
+ * already have a column family for that and return it or create a new one if it doesn't exist.
+ *
+ * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
+ * that we checkpointed, i.e. is already in the map of column families.
+ */
+ private ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {
+
+ Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName());
+
+ if (stateInfo != null) {
+ if (!stateInfo.f1.equals(descriptor)) {
+ throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor);
+ }
+ return stateInfo.f0;
+ }
+
+ ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), getColumnOptions());
+
+ try {
+ ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
+ kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor));
+ return columnFamily;
+ } catch (RocksDBException e) {
+ throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
+ }
+ }
+
+ /**
+ * Used by k/v states to access the current key.
+ */
+ Object currentKey() {
+ return currentKey;
+ }
+
+ /**
+ * Used by k/v states to access the key serializer.
+ */
+ TypeSerializer keySerializer() {
+ return keySerializer;
+ }
+
@Override
protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc) throws Exception {
- File dbPath = getDbPath(stateDesc.getName());
- String checkpointPath = getCheckpointPath(stateDesc.getName());
-
- return new RocksDBValueState<>(keySerializer, namespaceSerializer,
- stateDesc, dbPath, checkpointPath, getRocksDBOptions());
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+ return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@Override
protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
- File dbPath = getDbPath(stateDesc.getName());
- String checkpointPath = getCheckpointPath(stateDesc.getName());
-
- return new RocksDBListState<>(keySerializer, namespaceSerializer,
- stateDesc, dbPath, checkpointPath, getRocksDBOptions());
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+ return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@Override
protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {
- File dbPath = getDbPath(stateDesc.getName());
- String checkpointPath = getCheckpointPath(stateDesc.getName());
-
- return new RocksDBReducingState<>(keySerializer, namespaceSerializer,
- stateDesc, dbPath, checkpointPath, getRocksDBOptions());
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+ return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@Override
protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- File dbPath = getDbPath(stateDesc.getName());
- String checkpointPath = getCheckpointPath(stateDesc.getName());
- return new RocksDBFoldingState<>(keySerializer, namespaceSerializer,
- stateDesc, dbPath, checkpointPath, getRocksDBOptions());
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+ return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
+ // ------------------------------------------------------------------------
+ // Non-partitioned state
+ // ------------------------------------------------------------------------
+
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception {
@@ -305,6 +902,28 @@ public class RocksDBStateBackend extends AbstractStateBackend {
// ------------------------------------------------------------------------
/**
+ * Enables fully asynchronous snapshotting of the partitioned state held in RocksDB.
+ *
+ * <p>By default, this is disabled. This means that RocksDB state is copied in a synchronous
+ * step, during which normal processing of elements pauses, followed by an asynchronous step
+ * of copying the RocksDB backup to the final checkpoint location. Fully asynchronous
+ * snapshots take longer (linear time requirement with respect to number of unique keys)
+ * but normal processing of elements is not paused.
+ */
+ public void enableFullyAsyncSnapshots() {
+ this.fullyAsyncBackup = true;
+ }
+
+ /**
+ * Disables fully asynchronous snapshotting of the partitioned state held in RocksDB.
+ *
+ * <p>By default, this is disabled.
+ */
+ public void disableFullyAsyncSnapshots() {
+ this.fullyAsyncBackup = false;
+ }
+
+ /**
* Sets the path where the RocksDB local database files should be stored on the local
* file system. Setting this path overrides the default behavior, where the
* files are stored across the configured temp directories.
@@ -431,24 +1050,41 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
/**
- * Gets the RocksDB Options to be used for all RocksDB instances.
+ * Gets the RocksDB {@link DBOptions} to be used for all RocksDB instances.
*/
- Options getRocksDBOptions() {
- if (rocksDbOptions == null) {
+ public DBOptions getDbOptions() {
+ if (dbOptions == null) {
// initial options from pre-defined profile
- Options opt = predefinedOptions.createOptions();
+ DBOptions opt = predefinedOptions.createDBOptions();
// add user-defined options, if specified
if (optionsFactory != null) {
- opt = optionsFactory.createOptions(opt);
+ opt = optionsFactory.createDBOptions(opt);
}
// add necessary default options
opt = opt.setCreateIfMissing(true);
- opt = opt.setMergeOperator(new StringAppendOperator());
-
- rocksDbOptions = opt;
+
+ dbOptions = opt;
+ }
+ return dbOptions;
+ }
+
+ /**
+ * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances.
+ */
+ public ColumnFamilyOptions getColumnOptions() {
+ if (columnOptions == null) {
+ // initial options from pre-defined profile
+ ColumnFamilyOptions opt = predefinedOptions.createColumnOptions();
+
+ // add user-defined options, if specified
+ if (optionsFactory != null) {
+ opt = optionsFactory.createColumnOptions(opt);
+ }
+
+ columnOptions = opt;
}
- return rocksDbOptions;
+ return columnOptions;
}
}