You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/01 10:57:10 UTC

[06/11] flink git commit: [FLINK-6695] Activate strict checkstyle for flink-statebackend-rocksDB

[FLINK-6695] Activate strict checkstyle for flink-statebackend-rocksDB


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60721e07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60721e07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60721e07

Branch: refs/heads/master
Commit: 60721e07dca50b268c0509703d69f66b03ca6d3a
Parents: a84ce0b
Author: zentol <ch...@apache.org>
Authored: Tue May 23 22:05:19 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 1 11:14:11 2017 +0200

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          | 39 ++++++++++
 .../streaming/state/AbstractRocksDBState.java   | 16 ++---
 .../contrib/streaming/state/OptionsFactory.java | 14 ++--
 .../streaming/state/PredefinedOptions.java      | 28 ++++----
 .../state/RocksDBAggregatingState.java          | 12 ++--
 .../streaming/state/RocksDBFoldingState.java    |  7 +-
 .../state/RocksDBKeyedStateBackend.java         | 52 +++++++-------
 .../streaming/state/RocksDBListState.java       |  6 +-
 .../streaming/state/RocksDBMapState.java        | 75 ++++++++++----------
 .../streaming/state/RocksDBReducingState.java   |  8 +--
 .../streaming/state/RocksDBStateBackend.java    | 19 ++---
 .../state/RocksDBStateBackendFactory.java       |  6 +-
 .../streaming/state/RocksDBValueState.java      |  5 +-
 .../streaming/state/RocksDBStateBackend.java    |  8 ++-
 .../state/RocksDBAggregatingStateTest.java      |  5 +-
 .../state/RocksDBAsyncSnapshotTest.java         | 14 +---
 .../streaming/state/RocksDBInitResetTest.java   |  2 +-
 .../streaming/state/RocksDBListStateTest.java   |  4 +-
 .../state/RocksDBMergeIteratorTest.java         |  4 ++
 .../state/RocksDBReducingStateTest.java         |  8 +--
 .../state/RocksDBStateBackendConfigTest.java    |  7 +-
 .../state/RocksDBStateBackendFactoryTest.java   |  3 +
 .../state/RocksDBStateBackendTest.java          |  6 +-
 .../state/RocksDbMultiClassLoaderTest.java      |  4 +-
 .../state/benchmark/RocksDBPerformanceTest.java | 16 ++---
 25 files changed, 205 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index 527ca18..f3d9da5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -92,4 +92,43 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 	</dependencies>
+	
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+					Execute checkstyle after compilation but before tests.
+
+					This ensures that any parsing or type checking errors are from
+					javac, so they look as expected. Beyond that, we want to
+					fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index ba7fb28..c061835 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -28,8 +28,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -52,19 +52,19 @@ import java.io.IOException;
 public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V>
 		implements InternalKvState<N>, State {
 
-	/** Serializer for the namespace */
+	/** Serializer for the namespace. */
 	final TypeSerializer<N> namespaceSerializer;
 
-	/** The current namespace, which the next value methods will refer to */
+	/** The current namespace, which the next value methods will refer to. */
 	private N currentNamespace;
 
-	/** Backend that holds the actual RocksDB instance where we store state */
+	/** Backend that holds the actual RocksDB instance where we store state. */
 	protected RocksDBKeyedStateBackend<K> backend;
 
-	/** The column family of this particular instance of state */
+	/** The column family of this particular instance of state. */
 	protected ColumnFamilyHandle columnFamily;
 
-	/** State descriptor from which to create this state instance */
+	/** State descriptor from which to create this state instance. */
 	protected final SD stateDesc;
 
 	/**
@@ -110,7 +110,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 			writeCurrentKeyWithGroupAndNamespace();
 			byte[] key = keySerializationStream.toByteArray();
 			backend.db.remove(columnFamily, writeOptions, key);
-		} catch (IOException|RocksDBException e) {
+		} catch (IOException | RocksDBException e) {
 			throw new RuntimeException("Error while removing entry from RocksDB", e);
 		}
 	}
@@ -220,7 +220,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 			value >>>= 8;
 		} while (value != 0);
 	}
-	
+
 	protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
 		int keyGroup = readKeyGroup(inputView);
 		K key = readKey(inputStream, inputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 863c5da..34f7f62 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -25,18 +25,18 @@ import org.rocksdb.DBOptions;
  * A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}.
  * Options have to be created lazily by this factory, because the {@code Options}
  * class is not serializable and holds pointers to native code.
- * 
+ *
  * <p>A typical pattern to use this OptionsFactory is as follows:
- * 
+ *
  * <h3>Java 8:</h3>
  * <pre>{@code
  * rocksDbBackend.setOptions( (currentOptions) -> currentOptions.setMaxOpenFiles(1024) );
  * }</pre>
- * 
+ *
  * <h3>Java 7:</h3>
  * <pre>{@code
  * rocksDbBackend.setOptions(new OptionsFactory() {
- *     
+ *
  *     public Options setOptions(Options currentOptions) {
  *         return currentOptions.setMaxOpenFiles(1024);
  *     }
@@ -49,11 +49,11 @@ public interface OptionsFactory extends java.io.Serializable {
 	 * This method should set the additional options on top of the current options object.
 	 * The current options object may contain pre-defined options based on flags that have
 	 * been configured on the state backend.
-	 * 
+	 *
 	 * <p>It is important to set the options on the current object and return the result from
 	 * the setter methods, otherwise the pre-defined options may get lost.
-	 * 
-	 * @param currentOptions The options object with the pre-defined options. 
+	 *
+	 * @param currentOptions The options object with the pre-defined options.
 	 * @return The options object on which the additional options are set.
 	 */
 	DBOptions createDBOptions(DBOptions currentOptions);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index 93aac85..f606131 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -25,10 +25,10 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.StringAppendOperator;
 
 /**
- * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. 
+ * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
  * The various pre-defined choices are configurations that have been empirically
  * determined to be beneficial for performance under different settings.
- * 
+ *
  * <p>Some of these settings are based on experiments by the Flink community, some follow
  * guides from the RocksDB project.
  */
@@ -37,12 +37,12 @@ public enum PredefinedOptions {
 	/**
 	 * Default options for all settings, except that writes are not forced to the
 	 * disk.
-	 * 
+	 *
 	 * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
 	 * there is no need to sync data to stable storage.
 	 */
 	DEFAULT {
-		
+
 		@Override
 		public DBOptions createDBOptions() {
 			return new DBOptions()
@@ -60,11 +60,11 @@ public enum PredefinedOptions {
 
 	/**
 	 * Pre-defined options for regular spinning hard disks.
-	 * 
+	 *
 	 * <p>This constant configures RocksDB with some options that lead empirically
 	 * to better performance when the machines executing the system use
 	 * regular spinning hard disks.
-	 * 
+	 *
 	 * <p>The following options are set:
 	 * <ul>
 	 *     <li>setCompactionStyle(CompactionStyle.LEVEL)</li>
@@ -74,7 +74,7 @@ public enum PredefinedOptions {
 	 *     <li>setDisableDataSync(true)</li>
 	 *     <li>setMaxOpenFiles(-1)</li>
 	 * </ul>
-	 * 
+	 *
 	 * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
 	 * there is no need to sync data to stable storage.
 	 */
@@ -121,7 +121,7 @@ public enum PredefinedOptions {
 	 *     <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li>
 	 *     <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li>
 	 * </ul>
-	 * 
+	 *
 	 * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
 	 * there is no need to sync data to stable storage.
 	 */
@@ -161,13 +161,13 @@ public enum PredefinedOptions {
 					);
 		}
 	},
-	
+
 	/**
 	 * Pre-defined options for Flash SSDs.
 	 *
 	 * <p>This constant configures RocksDB with some options that lead empirically
 	 * to better performance when the machines executing the system use SSDs.
-	 * 
+	 *
 	 * <p>The following options are set:
 	 * <ul>
 	 *     <li>setIncreaseParallelism(4)</li>
@@ -175,7 +175,7 @@ public enum PredefinedOptions {
 	 *     <li>setDisableDataSync(true)</li>
 	 *     <li>setMaxOpenFiles(-1)</li>
 	 * </ul>
-	 * 
+	 *
 	 * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
 	 * there is no need to sync data to stable storage.
 	 */
@@ -196,13 +196,13 @@ public enum PredefinedOptions {
 					.setMergeOperator(new StringAppendOperator());
 		}
 	};
-	
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates the {@link DBOptions}for this pre-defined setting.
-	 * 
-	 * @return The pre-defined options object. 
+	 *
+	 * @return The pre-defined options object.
 	 */
 	public abstract DBOptions createDBOptions();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 1f306b4..fc84456 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -47,10 +47,10 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 	extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC>
 	implements InternalAggregatingState<N, T, R> {
 
-	/** Serializer for the values */
+	/** Serializer for the values. */
 	private final TypeSerializer<ACC> valueSerializer;
 
-	/** User-specified aggregation function */
+	/** User-specified aggregation function. */
 	private final AggregateFunction<T, ACC, R> aggFunction;
 
 	/**
@@ -64,7 +64,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 	 *
 	 * @param namespaceSerializer
 	 *             The serializer for the namespace.
-	 * @param stateDesc              
+	 * @param stateDesc
 	 *             The state identifier for the state. This contains the state name and aggregation function.
 	 */
 	public RocksDBAggregatingState(
@@ -154,7 +154,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 					writeKeyWithGroupAndNamespace(
 							keyGroup, key, source,
 							keySerializationStream, keySerializationDataOutputView);
-					
+
 					final byte[] sourceKey = keySerializationStream.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 
@@ -174,7 +174,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
-				// create the target full-binary-key 
+				// create the target full-binary-key
 				writeKeyWithGroupAndNamespace(
 						keyGroup, key, target,
 						keySerializationStream, keySerializationDataOutputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index d5d9fce..479565e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -47,10 +48,10 @@ public class RocksDBFoldingState<K, N, T, ACC>
 	extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
 	implements InternalFoldingState<N, T, ACC> {
 
-	/** Serializer for the values */
+	/** Serializer for the values. */
 	private final TypeSerializer<ACC> valueSerializer;
 
-	/** User-specified fold function */
+	/** User-specified fold function. */
 	private final FoldFunction<T, ACC> foldFunction;
 
 	/**
@@ -90,7 +91,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 				return null;
 			}
 			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-		} catch (IOException|RocksDBException e) {
+		} catch (IOException | RocksDBException e) {
 			throw new RuntimeException("Error while retrieving data from RocksDB", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 053c820..241c0b3 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
@@ -53,7 +54,6 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -79,6 +79,7 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -125,16 +126,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	private final String operatorIdentifier;
 
-	/** The column family options from the options factory */
+	/** The column family options from the options factory. */
 	private final ColumnFamilyOptions columnOptions;
 
-	/** The DB options from the options factory */
+	/** The DB options from the options factory. */
 	private final DBOptions dbOptions;
 
-	/** Path where this configured instance stores its data directory */
+	/** Path where this configured instance stores its data directory. */
 	private final File instanceBasePath;
 
-	/** Path where this configured instance stores its RocksDB data base */
+	/** Path where this configured instance stores its RocksDB data base. */
 	private final File instanceRocksDBPath;
 
 	/**
@@ -160,7 +161,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Map of state names to their corresponding restored state meta info.
 	 *
-	 * TODO this map can be removed when eager-state registration is in place.
+	 * <p>TODO this map can be removed when eager-state registration is in place.
 	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
 	 */
 	private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
@@ -168,13 +169,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** Number of bytes required to prefix the key groups. */
 	private final int keyGroupPrefixBytes;
 
-	/** True if incremental checkpointing is enabled */
+	/** True if incremental checkpointing is enabled. */
 	private final boolean enableIncrementalCheckpointing;
 
-	/** The state handle ids of all sst files materialized in snapshots for previous checkpoints */
+	/** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
 	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
 
-	/** The identifier of the last completed checkpoint */
+	/** The identifier of the last completed checkpoint. */
 	private long lastCompletedCheckpointId = -1;
 
 	private static final String SST_FILE_SUFFIX = ".sst";
@@ -711,22 +712,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	private static final class RocksDBIncrementalSnapshotOperation<K> {
 
-		/** The backend which we snapshot */
+		/** The backend which we snapshot. */
 		private final RocksDBKeyedStateBackend<K> stateBackend;
 
-		/** Stream factory that creates the outpus streams to DFS */
+		/** Stream factory that creates the outpus streams to DFS. */
 		private final CheckpointStreamFactory checkpointStreamFactory;
 
-		/** Id for the current checkpoint */
+		/** Id for the current checkpoint. */
 		private final long checkpointId;
 
-		/** Timestamp for the current checkpoint */
+		/** Timestamp for the current checkpoint. */
 		private final long checkpointTimestamp;
 
-		/** All sst files that were part of the last previously completed checkpoint */
+		/** All sst files that were part of the last previously completed checkpoint. */
 		private Set<StateHandleID> baseSstFiles;
 
-		/** The state meta data */
+		/** The state meta data. */
 		private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
 
 		private FileSystem backupFileSystem;
@@ -888,8 +889,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 			}
 
-
-
 			synchronized (stateBackend.materializedSstFiles) {
 				stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
 			}
@@ -1036,13 +1035,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
 
-		/** Current key-groups state handle from which we restore key-groups */
+		/** Current key-groups state handle from which we restore key-groups. */
 		private KeyGroupsStateHandle currentKeyGroupsStateHandle;
-		/** Current input stream we obtained from currentKeyGroupsStateHandle */
+		/** Current input stream we obtained from currentKeyGroupsStateHandle. */
 		private FSDataInputStream currentStateHandleInStream;
-		/** Current data input view that wraps currentStateHandleInStream */
+		/** Current data input view that wraps currentStateHandleInStream. */
 		private DataInputView currentStateHandleInView;
-		/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle */
+		/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */
 		private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
 
 		/**
@@ -1082,7 +1081,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		/**
-		 * Restore one key groups state handle
+		 * Restore one key groups state handle.
 		 *
 		 * @throws IOException
 		 * @throws RocksDBException
@@ -1105,7 +1104,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		/**
-		 * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle
+		 * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle.
 		 *
 		 * @throws IOException
 		 * @throws ClassNotFoundException
@@ -1169,7 +1168,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		/**
-		 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle
+		 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
 		 *
 		 * @throws IOException
 		 * @throws RocksDBException
@@ -1376,7 +1375,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 								int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
 								byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
 								for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
-									startKeyGroupPrefixBytes[j] = (byte)(startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
+									startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
 								}
 
 								iterator.seek(startKeyGroupPrefixBytes);
@@ -1430,7 +1429,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 								columnFamilyHandle, stateMetaInfo));
 					}
 
-
 					// use the restore sst files as the base for succeeding checkpoints
 					synchronized (stateBackend.materializedSstFiles) {
 						stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
@@ -1480,7 +1478,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
 
-				if (! (rawStateHandle instanceof IncrementalKeyedStateHandle)) {
+				if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) {
 					throw new IllegalStateException("Unexpected state handle type, " +
 						"expected " + IncrementalKeyedStateHandle.class +
 						", but found " + rawStateHandle.getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index a8b20d1..9d3e97e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -50,7 +50,7 @@ public class RocksDBListState<K, N, V>
 	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>>
 	implements InternalListState<N, V> {
 
-	/** Serializer for the values */
+	/** Serializer for the values. */
 	private final TypeSerializer<V> valueSerializer;
 
 	/**
@@ -100,7 +100,7 @@ public class RocksDBListState<K, N, V>
 				}
 			}
 			return result;
-		} catch (IOException|RocksDBException e) {
+		} catch (IOException | RocksDBException e) {
 			throw new RuntimeException("Error while retrieving data from RocksDB", e);
 		}
 	}
@@ -131,7 +131,7 @@ public class RocksDBListState<K, N, V>
 		final int keyGroup = backend.getCurrentKeyGroupIndex();
 
 		try {
-			// create the target full-binary-key 
+			// create the target full-binary-key
 			writeKeyWithGroupAndNamespace(
 					keyGroup, key, target,
 					keySerializationStream, keySerializationDataOutputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5125240..75c1651 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -45,7 +46,7 @@ import java.util.Map;
 
 /**
  * {@link MapState} implementation that stores state in RocksDB.
- * <p>
+ *
  * <p>{@link RocksDBStateBackend} must ensure that we set the
  * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
  * we use the {@code merge()} call.
@@ -58,10 +59,10 @@ import java.util.Map;
 public class RocksDBMapState<K, N, UK, UV>
 	extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>>
 	implements InternalMapState<N, UK, UV> {
-	
-	private static Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
 
-	/** Serializer for the keys and values */
+	private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
+
+	/** Serializer for the keys and values. */
 	private final TypeSerializer<UK> userKeySerializer;
 	private final TypeSerializer<UV> userValueSerializer;
 
@@ -105,19 +106,19 @@ public class RocksDBMapState<K, N, UK, UV>
 
 	@Override
 	public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
-		
+
 		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
 		byte[] rawValueBytes = serializeUserValue(userValue);
 
 		backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
 	}
-	
+
 	@Override
 	public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
 		if (map == null) {
 			return;
 		}
-		
+
 		for (Map.Entry<UK, UV> entry : map.entrySet()) {
 			put(entry.getKey(), entry.getValue());
 		}
@@ -137,7 +138,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
 		return (rawValueBytes != null);
 	}
-	
+
 	@Override
 	public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException {
 		final Iterator<Map.Entry<UK, UV>> iterator = iterator();
@@ -158,7 +159,7 @@ public class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public Iterable<UK> keys() throws IOException, RocksDBException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-		
+
 		return new Iterable<UK>() {
 			@Override
 			public Iterator<UK> iterator() {
@@ -176,7 +177,7 @@ public class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public Iterable<UV> values() throws IOException, RocksDBException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-		
+
 		return new Iterable<UV>() {
 			@Override
 			public Iterator<UV> iterator() {
@@ -202,7 +203,7 @@ public class RocksDBMapState<K, N, UK, UV>
 			}
 		};
 	}
-	
+
 	@Override
 	public void clear() {
 		try {
@@ -216,7 +217,7 @@ public class RocksDBMapState<K, N, UK, UV>
 			LOG.warn("Error while cleaning the state.", e);
 		}
 	}
-	
+
 	@Override
 	@SuppressWarnings("unchecked")
 	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
@@ -229,7 +230,7 @@ public class RocksDBMapState<K, N, UK, UV>
 				namespaceSerializer);
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
-		
+
 		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
 		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
 		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, outputStream, outputView);
@@ -246,7 +247,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		if (!iterator.hasNext()) {
 			return null;
 		}
-		
+
 		return KvStateRequestSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() {
 			@Override
 			public Iterator<Map.Entry<UK, UV>> iterator() {
@@ -254,21 +255,21 @@ public class RocksDBMapState<K, N, UK, UV>
 			}
 		}, userKeySerializer, userValueSerializer);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Serialization Methods
 	// ------------------------------------------------------------------------
-	
+
 	private byte[] serializeCurrentKeyAndNamespace() throws IOException {
 		writeCurrentKeyWithGroupAndNamespace();
-		
+
 		return keySerializationStream.toByteArray();
 	}
 
 	private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
 		writeCurrentKeyWithGroupAndNamespace();
 		userKeySerializer.serialize(userKey, keySerializationDataOutputView);
-		
+
 		return keySerializationStream.toByteArray();
 	}
 
@@ -282,7 +283,6 @@ public class RocksDBMapState<K, N, UK, UV>
 			userValueSerializer.serialize(userValue, keySerializationDataOutputView);
 		}
 
-		
 		return keySerializationStream.toByteArray();
 	}
 
@@ -291,7 +291,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 
 		readKeyWithGroupAndNamespace(bais, in);
-	
+
 		return userKeySerializer.deserialize(in);
 	}
 
@@ -303,20 +303,20 @@ public class RocksDBMapState<K, N, UK, UV>
 
 		return isNull ? null : userValueSerializer.deserialize(in);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Internal Classes
 	// ------------------------------------------------------------------------
-	
-	/** A map entry in RocksDBMapState */
+
+	/** A map entry in RocksDBMapState. */
 	private class RocksDBMapEntry implements Map.Entry<UK, UV> {
 		private final RocksDB db;
-		
+
 		/** The raw bytes of the key stored in RocksDB. Each user key is stored in RocksDB
 		 * with the format #KeyGroup#Key#Namespace#UserKey. */
 		private final byte[] rawKeyBytes;
-		
-		/** The raw bytes of the value stored in RocksDB */
+
+		/** The raw bytes of the value stored in RocksDB. */
 		private byte[] rawValueBytes;
 
 		/** True if the entry has been deleted. */
@@ -329,7 +329,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
 		RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, final byte[] rawValueBytes) {
 			this.db = db;
-			
+
 			this.rawKeyBytes = rawKeyBytes;
 			this.rawValueBytes = rawValueBytes;
 			this.deleted = false;
@@ -383,7 +383,7 @@ public class RocksDBMapState<K, N, UK, UV>
 			}
 
 			UV oldValue = getValue();
-			
+
 			try {
 				userValue = value;
 				rawValueBytes = serializeUserValue(value);
@@ -400,22 +400,22 @@ public class RocksDBMapState<K, N, UK, UV>
 	/** An auxiliary utility to scan all entries under the given key. */
 	private abstract class RocksDBMapIterator<T> implements Iterator<T> {
 
-		final static int CACHE_SIZE_BASE = 1;
-		final static int CACHE_SIZE_LIMIT = 128;
+		static final int CACHE_SIZE_BASE = 1;
+		static final int CACHE_SIZE_LIMIT = 128;
 
 		/** The db where data resides. */
 		private final RocksDB db;
 
-		/** 
+		/**
 		 * The prefix bytes of the key being accessed. All entries under the same key
 		 * has the same prefix, hence we can stop the iterating once coming across an
-		 * entry with a different prefix. 
+		 * entry with a different prefix.
 		 */
 		private final byte[] keyPrefixBytes;
 
 		/**
 		 * True if all entries have been accessed or the iterator has come across an
-		 * entry with a different prefix. 
+		 * entry with a different prefix.
 		 */
 		private boolean expired = false;
 
@@ -423,7 +423,6 @@ public class RocksDBMapState<K, N, UK, UV>
 		private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
 		private int cacheIndex = 0;
 
-
 		RocksDBMapIterator(final RocksDB db, final byte[] keyPrefixBytes) {
 			this.db = db;
 			this.keyPrefixBytes = keyPrefixBytes;
@@ -440,7 +439,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		public void remove() {
 			if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
 				throw new IllegalStateException("The remove operation must be called after an valid next operation.");
-			} 
+			}
 
 			RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
 			lastEntry.remove();
@@ -489,7 +488,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
 			iterator.seek(startBytes);
 
-			/* 
+			/*
 			 * If the last returned entry is not deleted, it will be the first entry in the
 			 * iterating. Skip it to avoid redundant access in such cases.
 			 */
@@ -515,7 +514,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
 			iterator.close();
 		}
-		
+
 		private boolean underSameKey(byte[] rawKeyBytes) {
 			if (rawKeyBytes.length < keyPrefixBytes.length) {
 				return false;
@@ -530,4 +529,4 @@ public class RocksDBMapState<K, N, UK, UV>
 			return true;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index ccc98a7..b5fe95f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -46,10 +46,10 @@ public class RocksDBReducingState<K, N, V>
 	extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, V>
 	implements InternalReducingState<N, V> {
 
-	/** Serializer for the values */
+	/** Serializer for the values. */
 	private final TypeSerializer<V> valueSerializer;
 
-	/** User-specified reduce function */
+	/** User-specified reduce function. */
 	private final ReduceFunction<V> reduceFunction;
 
 	/**
@@ -88,7 +88,7 @@ public class RocksDBReducingState<K, N, V>
 				return null;
 			}
 			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
-		} catch (IOException|RocksDBException e) {
+		} catch (IOException | RocksDBException e) {
 			throw new RuntimeException("Error while retrieving data from RocksDB", e);
 		}
 	}
@@ -157,7 +157,7 @@ public class RocksDBReducingState<K, N, V>
 
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
-				// create the target full-binary-key 
+				// create the target full-binary-key
 				writeKeyWithGroupAndNamespace(
 						keyGroup, key, target,
 						keySerializationStream, keySerializationDataOutputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 2b70dcd..4a30489 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.AbstractID;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.NativeLibraryLoader;
@@ -69,10 +70,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
 
-	/** The number of (re)tries for loading the RocksDB JNI library */
+	/** The number of (re)tries for loading the RocksDB JNI library. */
 	private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
 
-	
+
 	private static boolean rocksDbInitialized = false;
 
 	// ------------------------------------------------------------------------
@@ -93,23 +94,23 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	/** Base paths for RocksDB directory, as configured. May be null. */
 	private Path[] configuredDbBasePaths;
 
-	/** Base paths for RocksDB directory, as initialized */
+	/** Base paths for RocksDB directory, as initialized. */
 	private File[] initializedDbBasePaths;
 
 	private int nextDirectory;
 
 	// RocksDB options
 
-	/** The pre-configured option settings */
+	/** The pre-configured option settings. */
 	private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
 
-	/** The options factory to create the RocksDB options in the cluster */
+	/** The options factory to create the RocksDB options in the cluster. */
 	private OptionsFactory optionsFactory;
 
 	/** Whether we already lazily initialized our local storage directories. */
 	private transient boolean isInitialized = false;
 
-	/** True if incremental checkpointing is enabled */
+	/** True if incremental checkpointing is enabled. */
 	private boolean enableIncrementalCheckpointing;
 
 
@@ -183,10 +184,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its
 	 * checkpoint data streams. Typically, one would supply a filesystem or database state backend
 	 * here where the snapshots from RocksDB would be stored.
-	 * 
+	 *
 	 * <p>The snapshots of the RocksDB state will be stored using the given backend's
-	 * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}. 
-	 * 
+	 * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
+	 *
 	 * @param checkpointStreamBackend The backend to store the
 	 */
 	public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index bd9bcaa..f0569b8 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -39,13 +39,13 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBSt
 
 	private static final long serialVersionUID = 4906988360901930371L;
 
-	/** The key under which the config stores the directory where checkpoints should be stored */
+	/** The key under which the config stores the directory where checkpoints should be stored. */
 	public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
-	/** The key under which the config stores the directory where RocksDB should be stored */
+	/** The key under which the config stores the directory where RocksDB should be stored. */
 	public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir";
 
 	@Override
-	public RocksDBStateBackend createFromConfig(Configuration config) 
+	public RocksDBStateBackend createFromConfig(Configuration config)
 			throws IllegalConfigurationException, IOException {
 
 		final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index b2a4fba..da21e8a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -42,7 +43,7 @@ public class RocksDBValueState<K, N, V>
 	extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, V>
 	implements InternalValueState<N, V> {
 
-	/** Serializer for the values */
+	/** Serializer for the values. */
 	private final TypeSerializer<V> valueSerializer;
 
 	/**
@@ -80,7 +81,7 @@ public class RocksDBValueState<K, N, V>
 				return stateDesc.getDefaultValue();
 			}
 			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
-		} catch (IOException|RocksDBException e) {
+		} catch (IOException | RocksDBException e) {
 			throw new RuntimeException("Error while retrieving data from RocksDB.", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
index 695aa12..024d12e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -68,6 +68,12 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		}
 	}
 
+	/**
+	 * This class exists to provide a good error message if a user attempts to restore from a semi async snapshot.
+	 *
+	 * <p>see FLINK-5468
+	 */
+	@Deprecated
 	public static class FinalSemiAsyncSnapshot {
 
 		static {
@@ -75,7 +81,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		}
 
 		private static void throwExceptionOnLoadingThisClass() {
-			throw new RuntimeException("Attempt to requiresMigration RocksDB state created with semi async snapshot mode failed. "
+			throw new RuntimeException("Attempt to migrate RocksDB state created with semi async snapshot mode failed. "
 					+ "Unfortunately, this is not supported. Please create a new savepoint for the job using fully "
 					+ "async mode in Flink 1.1 and run migration again with the new savepoint.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
index 1b65466..f3065ab 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
@@ -36,8 +36,9 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static java.util.Arrays.asList;
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the {@link InternalAggregatingState} implementation on top of RocksDB.

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 812babb..d2edf0e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -61,12 +60,10 @@ import org.apache.flink.util.FutureUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
-
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -408,7 +405,7 @@ public class RocksDBAsyncSnapshotTest {
 							} catch (InterruptedException e) {
 								Thread.currentThread().interrupt();
 							}
-							if(closed) {
+							if (closed) {
 								throw new IOException("Stream closed.");
 							}
 							super.write(b);
@@ -422,7 +419,7 @@ public class RocksDBAsyncSnapshotTest {
 							} catch (InterruptedException e) {
 								Thread.currentThread().interrupt();
 							}
-							if(closed) {
+							if (closed) {
 								throw new IOException("Stream closed.");
 							}
 							super.write(b, off, len);
@@ -439,7 +436,7 @@ public class RocksDBAsyncSnapshotTest {
 		}
 	}
 
-	public static class AsyncCheckpointOperator
+	private static class AsyncCheckpointOperator
 		extends AbstractStreamOperator<String>
 		implements OneInputStreamOperator<String, String>, StreamCheckpointedOperator {
 
@@ -480,9 +477,4 @@ public class RocksDBAsyncSnapshotTest {
 		}
 
 	}
-
-	public static class DummyMapFunction<T> implements MapFunction<T, T> {
-		@Override
-		public T map(T value) { return value; }
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
index 7343b56..565f27d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.junit.Test;
 
 /**
- * This test checks that the RocksDB native code loader still responds to resetting the
+ * This test checks that the RocksDB native code loader still responds to resetting the init flag.
  */
 public class RocksDBInitResetTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
index e7efcfa..c6ccd5d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
@@ -64,9 +64,9 @@ public class RocksDBListStateTest {
 		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
 
 		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-		
+
 		try {
-			InternalListState<VoidNamespace, Long> state = 
+			InternalListState<VoidNamespace, Long> state =
 					keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
 			state.setCurrentNamespace(VoidNamespace.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index f5bcf86..1d14f6e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,6 +39,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+/**
+ * Tests for the RocksDBMergeIterator.
+ */
 public class RocksDBMergeIteratorTest {
 
 	private static final int NUM_KEY_VAL_STATES = 50;

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
index a8b4535..0733dce 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
@@ -54,7 +54,7 @@ public class RocksDBReducingStateTest {
 	@Test
 	public void testAddAndGet() throws Exception {
 
-		final ReducingStateDescriptor<Long> stateDescr = 
+		final ReducingStateDescriptor<Long> stateDescr =
 				new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
 		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
 
@@ -62,9 +62,9 @@ public class RocksDBReducingStateTest {
 		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
 
 		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-		
+
 		try {
-			InternalReducingState<VoidNamespace, Long> state = 
+			InternalReducingState<VoidNamespace, Long> state =
 					keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
 			state.setCurrentNamespace(VoidNamespace.INSTANCE);
 
@@ -126,7 +126,7 @@ public class RocksDBReducingStateTest {
 		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
 
 		try {
-			final InternalReducingState<TimeWindow, Long> state = 
+			final InternalReducingState<TimeWindow, Long> state =
 					keyedBackend.createReducingState(new TimeWindow.Serializer(), stateDescr);
 
 			// populate the different namespaces

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 463dd44..ff433ad 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -32,6 +31,8 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+
+import org.apache.commons.io.FileUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,7 +60,7 @@ import static org.mockito.Mockito.when;
 
 
 /**
- * Tests for configuring the RocksDB State Backend
+ * Tests for configuring the RocksDB State Backend.
  */
 @SuppressWarnings("serial")
 public class RocksDBStateBackendConfigTest {
@@ -102,7 +103,6 @@ public class RocksDBStateBackendConfigTest {
 						new KeyGroupRange(0, 0),
 						env.getTaskKvStateRegistry());
 
-
 		File instanceBasePath = keyedBackend.getInstanceBasePath();
 		assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
 
@@ -158,7 +158,6 @@ public class RocksDBStateBackendConfigTest {
 						new KeyGroupRange(0, 0),
 						env.getTaskKvStateRegistry());
 
-
 		File instanceBasePath = keyedBackend.getInstanceBasePath();
 		assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
index 9eb662a..5a937c4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -22,6 +22,9 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the RocksDBStateBackendFactory.
+ */
 public class RocksDBStateBackendFactoryTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 8d0db69..8b44a47 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.IOFileFilter;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueState;
@@ -42,6 +40,9 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -372,7 +373,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 			ValueState<String> state =
 				backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-
 			Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>();
 			SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
 			for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
index c53fa3e..4ec6532 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -27,7 +27,7 @@ import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLClassLoader;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotEquals;
 
 /**
  * This test validates that the RocksDB JNI library loading works properly
@@ -60,7 +60,7 @@ public class RocksDbMultiClassLoaderTest {
 
 		final String tempDir = tmp.newFolder().getAbsolutePath();
 
- 		final Method meth1 = clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
+		final Method meth1 = clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
 		final Method meth2 = clazz2.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
 		meth1.setAccessible(true);
 		meth2.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/60721e07/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 7147583..3231e96 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.Options;
@@ -47,7 +46,7 @@ import java.util.Arrays;
 public class RocksDBPerformanceTest extends TestLogger {
 
 	@Rule
-	public final TemporaryFolder TMP = new TemporaryFolder();
+	public final TemporaryFolder tmp = new TemporaryFolder();
 
 	@Rule
 	public final RetryRule retry = new RetryRule();
@@ -55,7 +54,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 	@Test(timeout = 2000)
 	@RetryOnFailure(times = 3)
 	public void testRocksDbMergePerformance() throws Exception {
-		final File rocksDir = TMP.newFolder();
+		final File rocksDir = tmp.newFolder();
 
 		// ensure the RocksDB library is loaded to a distinct location each retry
 		NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
@@ -83,8 +82,8 @@ public class RocksDBPerformanceTest extends TestLogger {
 					.setSync(false)
 					.setDisableWAL(true);
 
-			final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()))
-		{
+			final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) {
+
 			// ----- insert -----
 			log.info("begin insert");
 
@@ -133,7 +132,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 	@Test(timeout = 2000)
 	@RetryOnFailure(times = 3)
 	public void testRocksDbRangeGetPerformance() throws Exception {
-		final File rocksDir = TMP.newFolder();
+		final File rocksDir = tmp.newFolder();
 
 		// ensure the RocksDB library is loaded to a distinct location each retry
 		NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
@@ -161,8 +160,8 @@ public class RocksDBPerformanceTest extends TestLogger {
 					.setSync(false)
 					.setDisableWAL(true);
 
-			final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()))
-		{
+			final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) {
+
 			final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
 
 			final Unsafe unsafe = MemoryUtils.UNSAFE;
@@ -205,7 +204,6 @@ public class RocksDBPerformanceTest extends TestLogger {
 		}
 	}
 
-
 	private static boolean samePrefix(byte[] prefix, byte[] key) {
 		for (int i = 0; i < prefix.length; i++) {
 			if (prefix[i] != key [i]) {