You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/01 10:57:10 UTC
[06/11] flink git commit: [FLINK-6695] Activate strict checkstyle for
flink-statebackend-rocksDB
[FLINK-6695] Activate strict checkstyle for flink-statebackend-rocksDB
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60721e07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60721e07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60721e07
Branch: refs/heads/master
Commit: 60721e07dca50b268c0509703d69f66b03ca6d3a
Parents: a84ce0b
Author: zentol <ch...@apache.org>
Authored: Tue May 23 22:05:19 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 1 11:14:11 2017 +0200
----------------------------------------------------------------------
.../flink-statebackend-rocksdb/pom.xml | 39 ++++++++++
.../streaming/state/AbstractRocksDBState.java | 16 ++---
.../contrib/streaming/state/OptionsFactory.java | 14 ++--
.../streaming/state/PredefinedOptions.java | 28 ++++----
.../state/RocksDBAggregatingState.java | 12 ++--
.../streaming/state/RocksDBFoldingState.java | 7 +-
.../state/RocksDBKeyedStateBackend.java | 52 +++++++-------
.../streaming/state/RocksDBListState.java | 6 +-
.../streaming/state/RocksDBMapState.java | 75 ++++++++++----------
.../streaming/state/RocksDBReducingState.java | 8 +--
.../streaming/state/RocksDBStateBackend.java | 19 ++---
.../state/RocksDBStateBackendFactory.java | 6 +-
.../streaming/state/RocksDBValueState.java | 5 +-
.../streaming/state/RocksDBStateBackend.java | 8 ++-
.../state/RocksDBAggregatingStateTest.java | 5 +-
.../state/RocksDBAsyncSnapshotTest.java | 14 +---
.../streaming/state/RocksDBInitResetTest.java | 2 +-
.../streaming/state/RocksDBListStateTest.java | 4 +-
.../state/RocksDBMergeIteratorTest.java | 4 ++
.../state/RocksDBReducingStateTest.java | 8 +--
.../state/RocksDBStateBackendConfigTest.java | 7 +-
.../state/RocksDBStateBackendFactoryTest.java | 3 +
.../state/RocksDBStateBackendTest.java | 6 +-
.../state/RocksDbMultiClassLoaderTest.java | 4 +-
.../state/benchmark/RocksDBPerformanceTest.java | 16 ++---
25 files changed, 205 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index 527ca18..f3d9da5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -92,4 +92,43 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index ba7fb28..c061835 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -28,8 +28,8 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
@@ -52,19 +52,19 @@ import java.io.IOException;
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V>
implements InternalKvState<N>, State {
- /** Serializer for the namespace */
+ /** Serializer for the namespace. */
final TypeSerializer<N> namespaceSerializer;
- /** The current namespace, which the next value methods will refer to */
+ /** The current namespace, which the next value methods will refer to. */
private N currentNamespace;
- /** Backend that holds the actual RocksDB instance where we store state */
+ /** Backend that holds the actual RocksDB instance where we store state. */
protected RocksDBKeyedStateBackend<K> backend;
- /** The column family of this particular instance of state */
+ /** The column family of this particular instance of state. */
protected ColumnFamilyHandle columnFamily;
- /** State descriptor from which to create this state instance */
+ /** State descriptor from which to create this state instance. */
protected final SD stateDesc;
/**
@@ -110,7 +110,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
backend.db.remove(columnFamily, writeOptions, key);
- } catch (IOException|RocksDBException e) {
+ } catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while removing entry from RocksDB", e);
}
}
@@ -220,7 +220,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
value >>>= 8;
} while (value != 0);
}
-
+
protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
int keyGroup = readKeyGroup(inputView);
K key = readKey(inputStream, inputView);
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 863c5da..34f7f62 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -25,18 +25,18 @@ import org.rocksdb.DBOptions;
* A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}.
* Options have to be created lazily by this factory, because the {@code Options}
* class is not serializable and holds pointers to native code.
- *
+ *
* <p>A typical pattern to use this OptionsFactory is as follows:
- *
+ *
* <h3>Java 8:</h3>
* <pre>{@code
* rocksDbBackend.setOptions( (currentOptions) -> currentOptions.setMaxOpenFiles(1024) );
* }</pre>
- *
+ *
* <h3>Java 7:</h3>
* <pre>{@code
* rocksDbBackend.setOptions(new OptionsFactory() {
- *
+ *
* public Options setOptions(Options currentOptions) {
* return currentOptions.setMaxOpenFiles(1024);
* }
@@ -49,11 +49,11 @@ public interface OptionsFactory extends java.io.Serializable {
* This method should set the additional options on top of the current options object.
* The current options object may contain pre-defined options based on flags that have
* been configured on the state backend.
- *
+ *
* <p>It is important to set the options on the current object and return the result from
* the setter methods, otherwise the pre-defined options may get lost.
- *
- * @param currentOptions The options object with the pre-defined options.
+ *
+ * @param currentOptions The options object with the pre-defined options.
* @return The options object on which the additional options are set.
*/
DBOptions createDBOptions(DBOptions currentOptions);
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index 93aac85..f606131 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -25,10 +25,10 @@ import org.rocksdb.DBOptions;
import org.rocksdb.StringAppendOperator;
/**
- * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
+ * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
* The various pre-defined choices are configurations that have been empirically
* determined to be beneficial for performance under different settings.
- *
+ *
* <p>Some of these settings are based on experiments by the Flink community, some follow
* guides from the RocksDB project.
*/
@@ -37,12 +37,12 @@ public enum PredefinedOptions {
/**
* Default options for all settings, except that writes are not forced to the
* disk.
- *
+ *
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
DEFAULT {
-
+
@Override
public DBOptions createDBOptions() {
return new DBOptions()
@@ -60,11 +60,11 @@ public enum PredefinedOptions {
/**
* Pre-defined options for regular spinning hard disks.
- *
+ *
* <p>This constant configures RocksDB with some options that lead empirically
* to better performance when the machines executing the system use
* regular spinning hard disks.
- *
+ *
* <p>The following options are set:
* <ul>
* <li>setCompactionStyle(CompactionStyle.LEVEL)</li>
@@ -74,7 +74,7 @@ public enum PredefinedOptions {
* <li>setDisableDataSync(true)</li>
* <li>setMaxOpenFiles(-1)</li>
* </ul>
- *
+ *
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
@@ -121,7 +121,7 @@ public enum PredefinedOptions {
* <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li>
* <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li>
* </ul>
- *
+ *
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
@@ -161,13 +161,13 @@ public enum PredefinedOptions {
);
}
},
-
+
/**
* Pre-defined options for Flash SSDs.
*
* <p>This constant configures RocksDB with some options that lead empirically
* to better performance when the machines executing the system use SSDs.
- *
+ *
* <p>The following options are set:
* <ul>
* <li>setIncreaseParallelism(4)</li>
@@ -175,7 +175,7 @@ public enum PredefinedOptions {
* <li>setDisableDataSync(true)</li>
* <li>setMaxOpenFiles(-1)</li>
* </ul>
- *
+ *
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
@@ -196,13 +196,13 @@ public enum PredefinedOptions {
.setMergeOperator(new StringAppendOperator());
}
};
-
+
// ------------------------------------------------------------------------
/**
* Creates the {@link DBOptions}for this pre-defined setting.
- *
- * @return The pre-defined options object.
+ *
+ * @return The pre-defined options object.
*/
public abstract DBOptions createDBOptions();
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 1f306b4..fc84456 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
@@ -47,10 +47,10 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC>
implements InternalAggregatingState<N, T, R> {
- /** Serializer for the values */
+ /** Serializer for the values. */
private final TypeSerializer<ACC> valueSerializer;
- /** User-specified aggregation function */
+ /** User-specified aggregation function. */
private final AggregateFunction<T, ACC, R> aggFunction;
/**
@@ -64,7 +64,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
*
* @param namespaceSerializer
* The serializer for the namespace.
- * @param stateDesc
+ * @param stateDesc
* The state identifier for the state. This contains the state name and aggregation function.
*/
public RocksDBAggregatingState(
@@ -154,7 +154,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
writeKeyWithGroupAndNamespace(
keyGroup, key, source,
keySerializationStream, keySerializationDataOutputView);
-
+
final byte[] sourceKey = keySerializationStream.toByteArray();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
@@ -174,7 +174,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
// if something came out of merging the sources, merge it or write it to the target
if (current != null) {
- // create the target full-binary-key
+ // create the target full-binary-key
writeKeyWithGroupAndNamespace(
keyGroup, key, target,
keySerializationStream, keySerializationDataOutputView);
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index d5d9fce..479565e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
+
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
@@ -47,10 +48,10 @@ public class RocksDBFoldingState<K, N, T, ACC>
extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
implements InternalFoldingState<N, T, ACC> {
- /** Serializer for the values */
+ /** Serializer for the values. */
private final TypeSerializer<ACC> valueSerializer;
- /** User-specified fold function */
+ /** User-specified fold function. */
private final FoldFunction<T, ACC> foldFunction;
/**
@@ -90,7 +91,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
return null;
}
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
- } catch (IOException|RocksDBException e) {
+ } catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while retrieving data from RocksDB", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 053c820..241c0b3 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
@@ -53,7 +54,6 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -79,6 +79,7 @@ import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
+
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -125,16 +126,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final String operatorIdentifier;
- /** The column family options from the options factory */
+ /** The column family options from the options factory. */
private final ColumnFamilyOptions columnOptions;
- /** The DB options from the options factory */
+ /** The DB options from the options factory. */
private final DBOptions dbOptions;
- /** Path where this configured instance stores its data directory */
+ /** Path where this configured instance stores its data directory. */
private final File instanceBasePath;
- /** Path where this configured instance stores its RocksDB data base */
+ /** Path where this configured instance stores its RocksDB data base. */
private final File instanceRocksDBPath;
/**
@@ -160,7 +161,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Map of state names to their corresponding restored state meta info.
*
- * TODO this map can be removed when eager-state registration is in place.
+ * <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
@@ -168,13 +169,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
- /** True if incremental checkpointing is enabled */
+ /** True if incremental checkpointing is enabled. */
private final boolean enableIncrementalCheckpointing;
- /** The state handle ids of all sst files materialized in snapshots for previous checkpoints */
+ /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
- /** The identifier of the last completed checkpoint */
+ /** The identifier of the last completed checkpoint. */
private long lastCompletedCheckpointId = -1;
private static final String SST_FILE_SUFFIX = ".sst";
@@ -711,22 +712,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final class RocksDBIncrementalSnapshotOperation<K> {
- /** The backend which we snapshot */
+ /** The backend which we snapshot. */
private final RocksDBKeyedStateBackend<K> stateBackend;
- /** Stream factory that creates the outpus streams to DFS */
+ /** Stream factory that creates the outpus streams to DFS. */
private final CheckpointStreamFactory checkpointStreamFactory;
- /** Id for the current checkpoint */
+ /** Id for the current checkpoint. */
private final long checkpointId;
- /** Timestamp for the current checkpoint */
+ /** Timestamp for the current checkpoint. */
private final long checkpointTimestamp;
- /** All sst files that were part of the last previously completed checkpoint */
+ /** All sst files that were part of the last previously completed checkpoint. */
private Set<StateHandleID> baseSstFiles;
- /** The state meta data */
+ /** The state meta data. */
private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
private FileSystem backupFileSystem;
@@ -888,8 +889,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
-
-
synchronized (stateBackend.materializedSstFiles) {
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
@@ -1036,13 +1035,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
- /** Current key-groups state handle from which we restore key-groups */
+ /** Current key-groups state handle from which we restore key-groups. */
private KeyGroupsStateHandle currentKeyGroupsStateHandle;
- /** Current input stream we obtained from currentKeyGroupsStateHandle */
+ /** Current input stream we obtained from currentKeyGroupsStateHandle. */
private FSDataInputStream currentStateHandleInStream;
- /** Current data input view that wraps currentStateHandleInStream */
+ /** Current data input view that wraps currentStateHandleInStream. */
private DataInputView currentStateHandleInView;
- /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle */
+ /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */
private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
/**
@@ -1082,7 +1081,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
/**
- * Restore one key groups state handle
+ * Restore one key groups state handle.
*
* @throws IOException
* @throws RocksDBException
@@ -1105,7 +1104,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
/**
- * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle
+ * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle.
*
* @throws IOException
* @throws ClassNotFoundException
@@ -1169,7 +1168,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
/**
- * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle
+ * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
*
* @throws IOException
* @throws RocksDBException
@@ -1376,7 +1375,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
- startKeyGroupPrefixBytes[j] = (byte)(startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
+ startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}
iterator.seek(startKeyGroupPrefixBytes);
@@ -1430,7 +1429,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
columnFamilyHandle, stateMetaInfo));
}
-
// use the restore sst files as the base for succeeding checkpoints
synchronized (stateBackend.materializedSstFiles) {
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
@@ -1480,7 +1478,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
- if (! (rawStateHandle instanceof IncrementalKeyedStateHandle)) {
+ if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected " + IncrementalKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index a8b20d1..9d3e97e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -50,7 +50,7 @@ public class RocksDBListState<K, N, V>
extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>>
implements InternalListState<N, V> {
- /** Serializer for the values */
+ /** Serializer for the values. */
private final TypeSerializer<V> valueSerializer;
/**
@@ -100,7 +100,7 @@ public class RocksDBListState<K, N, V>
}
}
return result;
- } catch (IOException|RocksDBException e) {
+ } catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while retrieving data from RocksDB", e);
}
}
@@ -131,7 +131,7 @@ public class RocksDBListState<K, N, V>
final int keyGroup = backend.getCurrentKeyGroupIndex();
try {
- // create the target full-binary-key
+ // create the target full-binary-key
writeKeyWithGroupAndNamespace(
keyGroup, key, target,
keySerializationStream, keySerializationDataOutputView);
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5125240..75c1651 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.Preconditions;
+
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -45,7 +46,7 @@ import java.util.Map;
/**
* {@link MapState} implementation that stores state in RocksDB.
- * <p>
+ *
* <p>{@link RocksDBStateBackend} must ensure that we set the
* {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
* we use the {@code merge()} call.
@@ -58,10 +59,10 @@ import java.util.Map;
public class RocksDBMapState<K, N, UK, UV>
extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>>
implements InternalMapState<N, UK, UV> {
-
- private static Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
- /** Serializer for the keys and values */
+ private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
+
+ /** Serializer for the keys and values. */
private final TypeSerializer<UK> userKeySerializer;
private final TypeSerializer<UV> userValueSerializer;
@@ -105,19 +106,19 @@ public class RocksDBMapState<K, N, UK, UV>
@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
-
+
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
byte[] rawValueBytes = serializeUserValue(userValue);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
}
-
+
@Override
public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
if (map == null) {
return;
}
-
+
for (Map.Entry<UK, UV> entry : map.entrySet()) {
put(entry.getKey(), entry.getValue());
}
@@ -137,7 +138,7 @@ public class RocksDBMapState<K, N, UK, UV>
return (rawValueBytes != null);
}
-
+
@Override
public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException {
final Iterator<Map.Entry<UK, UV>> iterator = iterator();
@@ -158,7 +159,7 @@ public class RocksDBMapState<K, N, UK, UV>
@Override
public Iterable<UK> keys() throws IOException, RocksDBException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-
+
return new Iterable<UK>() {
@Override
public Iterator<UK> iterator() {
@@ -176,7 +177,7 @@ public class RocksDBMapState<K, N, UK, UV>
@Override
public Iterable<UV> values() throws IOException, RocksDBException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-
+
return new Iterable<UV>() {
@Override
public Iterator<UV> iterator() {
@@ -202,7 +203,7 @@ public class RocksDBMapState<K, N, UK, UV>
}
};
}
-
+
@Override
public void clear() {
try {
@@ -216,7 +217,7 @@ public class RocksDBMapState<K, N, UK, UV>
LOG.warn("Error while cleaning the state.", e);
}
}
-
+
@Override
@SuppressWarnings("unchecked")
public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
@@ -229,7 +230,7 @@ public class RocksDBMapState<K, N, UK, UV>
namespaceSerializer);
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
-
+
ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, outputStream, outputView);
@@ -246,7 +247,7 @@ public class RocksDBMapState<K, N, UK, UV>
if (!iterator.hasNext()) {
return null;
}
-
+
return KvStateRequestSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() {
@Override
public Iterator<Map.Entry<UK, UV>> iterator() {
@@ -254,21 +255,21 @@ public class RocksDBMapState<K, N, UK, UV>
}
}, userKeySerializer, userValueSerializer);
}
-
+
// ------------------------------------------------------------------------
// Serialization Methods
// ------------------------------------------------------------------------
-
+
private byte[] serializeCurrentKeyAndNamespace() throws IOException {
writeCurrentKeyWithGroupAndNamespace();
-
+
return keySerializationStream.toByteArray();
}
private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
writeCurrentKeyWithGroupAndNamespace();
userKeySerializer.serialize(userKey, keySerializationDataOutputView);
-
+
return keySerializationStream.toByteArray();
}
@@ -282,7 +283,6 @@ public class RocksDBMapState<K, N, UK, UV>
userValueSerializer.serialize(userValue, keySerializationDataOutputView);
}
-
return keySerializationStream.toByteArray();
}
@@ -291,7 +291,7 @@ public class RocksDBMapState<K, N, UK, UV>
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
readKeyWithGroupAndNamespace(bais, in);
-
+
return userKeySerializer.deserialize(in);
}
@@ -303,20 +303,20 @@ public class RocksDBMapState<K, N, UK, UV>
return isNull ? null : userValueSerializer.deserialize(in);
}
-
+
// ------------------------------------------------------------------------
// Internal Classes
// ------------------------------------------------------------------------
-
- /** A map entry in RocksDBMapState */
+
+ /** A map entry in RocksDBMapState. */
private class RocksDBMapEntry implements Map.Entry<UK, UV> {
private final RocksDB db;
-
+
/** The raw bytes of the key stored in RocksDB. Each user key is stored in RocksDB
* with the format #KeyGroup#Key#Namespace#UserKey. */
private final byte[] rawKeyBytes;
-
- /** The raw bytes of the value stored in RocksDB */
+
+ /** The raw bytes of the value stored in RocksDB. */
private byte[] rawValueBytes;
/** True if the entry has been deleted. */
@@ -329,7 +329,7 @@ public class RocksDBMapState<K, N, UK, UV>
RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, final byte[] rawValueBytes) {
this.db = db;
-
+
this.rawKeyBytes = rawKeyBytes;
this.rawValueBytes = rawValueBytes;
this.deleted = false;
@@ -383,7 +383,7 @@ public class RocksDBMapState<K, N, UK, UV>
}
UV oldValue = getValue();
-
+
try {
userValue = value;
rawValueBytes = serializeUserValue(value);
@@ -400,22 +400,22 @@ public class RocksDBMapState<K, N, UK, UV>
/** An auxiliary utility to scan all entries under the given key. */
private abstract class RocksDBMapIterator<T> implements Iterator<T> {
- final static int CACHE_SIZE_BASE = 1;
- final static int CACHE_SIZE_LIMIT = 128;
+ static final int CACHE_SIZE_BASE = 1;
+ static final int CACHE_SIZE_LIMIT = 128;
/** The db where data resides. */
private final RocksDB db;
- /**
+ /**
* The prefix bytes of the key being accessed. All entries under the same key
* has the same prefix, hence we can stop the iterating once coming across an
- * entry with a different prefix.
+ * entry with a different prefix.
*/
private final byte[] keyPrefixBytes;
/**
* True if all entries have been accessed or the iterator has come across an
- * entry with a different prefix.
+ * entry with a different prefix.
*/
private boolean expired = false;
@@ -423,7 +423,6 @@ public class RocksDBMapState<K, N, UK, UV>
private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
private int cacheIndex = 0;
-
RocksDBMapIterator(final RocksDB db, final byte[] keyPrefixBytes) {
this.db = db;
this.keyPrefixBytes = keyPrefixBytes;
@@ -440,7 +439,7 @@ public class RocksDBMapState<K, N, UK, UV>
public void remove() {
if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
throw new IllegalStateException("The remove operation must be called after an valid next operation.");
- }
+ }
RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
lastEntry.remove();
@@ -489,7 +488,7 @@ public class RocksDBMapState<K, N, UK, UV>
iterator.seek(startBytes);
- /*
+ /*
* If the last returned entry is not deleted, it will be the first entry in the
* iterating. Skip it to avoid redundant access in such cases.
*/
@@ -515,7 +514,7 @@ public class RocksDBMapState<K, N, UK, UV>
iterator.close();
}
-
+
private boolean underSameKey(byte[] rawKeyBytes) {
if (rawKeyBytes.length < keyPrefixBytes.length) {
return false;
@@ -530,4 +529,4 @@ public class RocksDBMapState<K, N, UK, UV>
return true;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index ccc98a7..b5fe95f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -46,10 +46,10 @@ public class RocksDBReducingState<K, N, V>
extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, V>
implements InternalReducingState<N, V> {
- /** Serializer for the values */
+ /** Serializer for the values. */
private final TypeSerializer<V> valueSerializer;
- /** User-specified reduce function */
+ /** User-specified reduce function. */
private final ReduceFunction<V> reduceFunction;
/**
@@ -88,7 +88,7 @@ public class RocksDBReducingState<K, N, V>
return null;
}
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
- } catch (IOException|RocksDBException e) {
+ } catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while retrieving data from RocksDB", e);
}
}
@@ -157,7 +157,7 @@ public class RocksDBReducingState<K, N, V>
// if something came out of merging the sources, merge it or write it to the target
if (current != null) {
- // create the target full-binary-key
+ // create the target full-binary-key
writeKeyWithGroupAndNamespace(
keyGroup, key, target,
keySerializationStream, keySerializationDataOutputView);
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 2b70dcd..4a30489 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.util.AbstractID;
+
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.NativeLibraryLoader;
@@ -69,10 +70,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
- /** The number of (re)tries for loading the RocksDB JNI library */
+ /** The number of (re)tries for loading the RocksDB JNI library. */
private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
-
+
private static boolean rocksDbInitialized = false;
// ------------------------------------------------------------------------
@@ -93,23 +94,23 @@ public class RocksDBStateBackend extends AbstractStateBackend {
/** Base paths for RocksDB directory, as configured. May be null. */
private Path[] configuredDbBasePaths;
- /** Base paths for RocksDB directory, as initialized */
+ /** Base paths for RocksDB directory, as initialized. */
private File[] initializedDbBasePaths;
private int nextDirectory;
// RocksDB options
- /** The pre-configured option settings */
+ /** The pre-configured option settings. */
private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
- /** The options factory to create the RocksDB options in the cluster */
+ /** The options factory to create the RocksDB options in the cluster. */
private OptionsFactory optionsFactory;
/** Whether we already lazily initialized our local storage directories. */
private transient boolean isInitialized = false;
- /** True if incremental checkpointing is enabled */
+ /** True if incremental checkpointing is enabled. */
private boolean enableIncrementalCheckpointing;
@@ -183,10 +184,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its
* checkpoint data streams. Typically, one would supply a filesystem or database state backend
* here where the snapshots from RocksDB would be stored.
- *
+ *
* <p>The snapshots of the RocksDB state will be stored using the given backend's
- * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
- *
+ * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
+ *
* @param checkpointStreamBackend The backend to store the
*/
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index bd9bcaa..f0569b8 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -39,13 +39,13 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBSt
private static final long serialVersionUID = 4906988360901930371L;
- /** The key under which the config stores the directory where checkpoints should be stored */
+ /** The key under which the config stores the directory where checkpoints should be stored. */
public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
- /** The key under which the config stores the directory where RocksDB should be stored */
+ /** The key under which the config stores the directory where RocksDB should be stored. */
public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir";
@Override
- public RocksDBStateBackend createFromConfig(Configuration config)
+ public RocksDBStateBackend createFromConfig(Configuration config)
throws IllegalConfigurationException, IOException {
final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index b2a4fba..da21e8a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalValueState;
+
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
@@ -42,7 +43,7 @@ public class RocksDBValueState<K, N, V>
extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, V>
implements InternalValueState<N, V> {
- /** Serializer for the values */
+ /** Serializer for the values. */
private final TypeSerializer<V> valueSerializer;
/**
@@ -80,7 +81,7 @@ public class RocksDBValueState<K, N, V>
return stateDesc.getDefaultValue();
}
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
- } catch (IOException|RocksDBException e) {
+ } catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while retrieving data from RocksDB.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
index 695aa12..024d12e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -68,6 +68,12 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
}
+ /**
+ * This class exists to provide a good error message if a user attempts to restore from a semi async snapshot.
+ *
+ * <p>see FLINK-5468
+ */
+ @Deprecated
public static class FinalSemiAsyncSnapshot {
static {
@@ -75,7 +81,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
private static void throwExceptionOnLoadingThisClass() {
- throw new RuntimeException("Attempt to requiresMigration RocksDB state created with semi async snapshot mode failed. "
+ throw new RuntimeException("Attempt to migrate RocksDB state created with semi async snapshot mode failed. "
+ "Unfortunately, this is not supported. Please create a new savepoint for the job using fully "
+ "async mode in Flink 1.1 and run migration again with the new savepoint.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
index 1b65466..f3065ab 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
@@ -36,8 +36,9 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static java.util.Arrays.asList;
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
/**
* Tests for the {@link InternalAggregatingState} implementation on top of RocksDB.
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 812babb..d2edf0e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -61,12 +60,10 @@ import org.apache.flink.util.FutureUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
-
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -408,7 +405,7 @@ public class RocksDBAsyncSnapshotTest {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- if(closed) {
+ if (closed) {
throw new IOException("Stream closed.");
}
super.write(b);
@@ -422,7 +419,7 @@ public class RocksDBAsyncSnapshotTest {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- if(closed) {
+ if (closed) {
throw new IOException("Stream closed.");
}
super.write(b, off, len);
@@ -439,7 +436,7 @@ public class RocksDBAsyncSnapshotTest {
}
}
- public static class AsyncCheckpointOperator
+ private static class AsyncCheckpointOperator
extends AbstractStreamOperator<String>
implements OneInputStreamOperator<String, String>, StreamCheckpointedOperator {
@@ -480,9 +477,4 @@ public class RocksDBAsyncSnapshotTest {
}
}
-
- public static class DummyMapFunction<T> implements MapFunction<T, T> {
- @Override
- public T map(T value) { return value; }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
index 7343b56..565f27d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.contrib.streaming.state;
import org.junit.Test;
/**
- * This test checks that the RocksDB native code loader still responds to resetting the
+ * This test checks that the RocksDB native code loader still responds to resetting the init flag.
*/
public class RocksDBInitResetTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
index e7efcfa..c6ccd5d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
@@ -64,9 +64,9 @@ public class RocksDBListStateTest {
backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
+
try {
- InternalListState<VoidNamespace, Long> state =
+ InternalListState<VoidNamespace, Long> state =
keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
state.setCurrentNamespace(VoidNamespace.INSTANCE);
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index f5bcf86..1d14f6e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -38,6 +39,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
+/**
+ * Tests for the RocksDBMergeIterator.
+ */
public class RocksDBMergeIteratorTest {
private static final int NUM_KEY_VAL_STATES = 50;
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
index a8b4535..0733dce 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
@@ -54,7 +54,7 @@ public class RocksDBReducingStateTest {
@Test
public void testAddAndGet() throws Exception {
- final ReducingStateDescriptor<Long> stateDescr =
+ final ReducingStateDescriptor<Long> stateDescr =
new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
@@ -62,9 +62,9 @@ public class RocksDBReducingStateTest {
backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
+
try {
- InternalReducingState<VoidNamespace, Long> state =
+ InternalReducingState<VoidNamespace, Long> state =
keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
state.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -126,7 +126,7 @@ public class RocksDBReducingStateTest {
final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
try {
- final InternalReducingState<TimeWindow, Long> state =
+ final InternalReducingState<TimeWindow, Long> state =
keyedBackend.createReducingState(new TimeWindow.Serializer(), stateDescr);
// populate the different namespaces
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 463dd44..ff433ad 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.contrib.streaming.state;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -32,6 +31,8 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+
+import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -59,7 +60,7 @@ import static org.mockito.Mockito.when;
/**
- * Tests for configuring the RocksDB State Backend
+ * Tests for configuring the RocksDB State Backend.
*/
@SuppressWarnings("serial")
public class RocksDBStateBackendConfigTest {
@@ -102,7 +103,6 @@ public class RocksDBStateBackendConfigTest {
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());
-
File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
@@ -158,7 +158,6 @@ public class RocksDBStateBackendConfigTest {
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());
-
File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
index 9eb662a..5a937c4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -22,6 +22,9 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the RocksDBStateBackendFactory.
+ */
public class RocksDBStateBackendFactoryTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 8d0db69..8b44a47 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.contrib.streaming.state;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
@@ -42,6 +40,9 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -372,7 +373,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
ValueState<String> state =
backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>();
SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
index c53fa3e..4ec6532 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -27,7 +27,7 @@ import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotEquals;
/**
* This test validates that the RocksDB JNI library loading works properly
@@ -60,7 +60,7 @@ public class RocksDbMultiClassLoaderTest {
final String tempDir = tmp.newFolder().getAbsolutePath();
- final Method meth1 = clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
+ final Method meth1 = clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
final Method meth2 = clazz2.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
meth1.setAccessible(true);
meth2.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 7147583..3231e96 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-
import org.rocksdb.CompactionStyle;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.Options;
@@ -47,7 +46,7 @@ import java.util.Arrays;
public class RocksDBPerformanceTest extends TestLogger {
@Rule
- public final TemporaryFolder TMP = new TemporaryFolder();
+ public final TemporaryFolder tmp = new TemporaryFolder();
@Rule
public final RetryRule retry = new RetryRule();
@@ -55,7 +54,7 @@ public class RocksDBPerformanceTest extends TestLogger {
@Test(timeout = 2000)
@RetryOnFailure(times = 3)
public void testRocksDbMergePerformance() throws Exception {
- final File rocksDir = TMP.newFolder();
+ final File rocksDir = tmp.newFolder();
// ensure the RocksDB library is loaded to a distinct location each retry
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
@@ -83,8 +82,8 @@ public class RocksDBPerformanceTest extends TestLogger {
.setSync(false)
.setDisableWAL(true);
- final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()))
- {
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) {
+
// ----- insert -----
log.info("begin insert");
@@ -133,7 +132,7 @@ public class RocksDBPerformanceTest extends TestLogger {
@Test(timeout = 2000)
@RetryOnFailure(times = 3)
public void testRocksDbRangeGetPerformance() throws Exception {
- final File rocksDir = TMP.newFolder();
+ final File rocksDir = tmp.newFolder();
// ensure the RocksDB library is loaded to a distinct location each retry
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
@@ -161,8 +160,8 @@ public class RocksDBPerformanceTest extends TestLogger {
.setSync(false)
.setDisableWAL(true);
- final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()))
- {
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) {
+
final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
final Unsafe unsafe = MemoryUtils.UNSAFE;
@@ -205,7 +204,6 @@ public class RocksDBPerformanceTest extends TestLogger {
}
}
-
private static boolean samePrefix(byte[] prefix, byte[] key) {
for (int i = 0; i < prefix.length; i++) {
if (prefix[i] != key [i]) {