You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:48 UTC

[18/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-filesystem

[FLINK-6711] Activate strict checkstyle for flink-connector-filesystem


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7292c874
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7292c874
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7292c874

Branch: refs/heads/master
Commit: 7292c8743d981d61b0f860367e0266b307e1362f
Parents: fab8fe5
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:57:18 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:29 2017 +0200

----------------------------------------------------------------------
 .../flink-connector-filesystem/pom.xml          |   3 +-
 .../connectors/fs/AvroKeyValueSinkWriter.java   |  60 +++++----
 .../flink/streaming/connectors/fs/Bucketer.java |   5 +-
 .../flink/streaming/connectors/fs/Clock.java    |   8 +-
 .../connectors/fs/DateTimeBucketer.java         |  18 +--
 .../connectors/fs/NonRollingBucketer.java       |   2 +
 .../streaming/connectors/fs/RollingSink.java    | 109 ++++++++--------
 .../connectors/fs/SequenceFileWriter.java       |   5 +-
 .../connectors/fs/StreamWriterBase.java         |  21 +--
 .../streaming/connectors/fs/StringWriter.java   |   1 +
 .../streaming/connectors/fs/SystemClock.java    |   2 +-
 .../flink/streaming/connectors/fs/Writer.java   |   4 +-
 .../fs/bucketing/BasePathBucketer.java          |   2 +
 .../connectors/fs/bucketing/Bucketer.java       |   6 +-
 .../connectors/fs/bucketing/BucketingSink.java  |  93 ++++++-------
 .../fs/bucketing/DateTimeBucketer.java          |  16 ++-
 .../fs/RollingSinkFaultToleranceITCase.java     |  15 +--
 .../connectors/fs/RollingSinkITCase.java        | 130 ++++++++-----------
 .../connectors/fs/RollingSinkSecuredITCase.java |  20 +--
 .../BucketingSinkFaultToleranceITCase.java      |  17 ++-
 .../BucketingSinkFrom12MigrationTest.java       |  17 ++-
 .../fs/bucketing/BucketingSinkTest.java         |  39 +++---
 .../fs/bucketing/RollingSinkMigrationTest.java  |   7 +-
 .../RollingToBucketingMigrationTest.java        |   7 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 26 files changed, 315 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml
index 07b0ae1..f39758b 100644
--- a/flink-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -65,7 +65,7 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -104,7 +104,6 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
-
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index 3e3c86b..45e73fe 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -18,10 +18,11 @@ package org.apache.flink.streaming.connectors.fs;
  * limitations under the License.
  */
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
@@ -31,15 +32,15 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+
 /**
 * Implementation of AvroKeyValue writer that can be used in Sink.
 * Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib)
@@ -49,7 +50,7 @@ Usage:
 		BucketingSink<Tuple2<Long, Long>> sink = new BucketingSink<Tuple2<Long, Long>>("/tmp/path");
 		sink.setBucketer(new DateTimeBucketer<Tuple2<Long, Long>>("yyyy-MM-dd/HH/mm/"));
 		sink.setPendingSuffix(".avro");
-		Map<String,String> properties = new HashMap<>();
+		Map<String, String> properties = new HashMap<>();
 		Schema longSchema = Schema.create(Type.LONG);
 		String keySchema = longSchema.toString();
 		String valueSchema = longSchema.toString();
@@ -57,7 +58,7 @@ Usage:
 		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema);
 		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true));
 		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
-		
+
 		sink.setWriter(new AvroSinkWriter<Long, Long>(properties));
 		sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
 }
@@ -77,37 +78,37 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
 	private final Map<String, String> properties;
 
 	/**
-	 * C'tor for the writer
-	 * <p>
-	 * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
+	 * C'tor for the writer.
+	 *
+	 * <p>You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
 	 * @param properties
 	 */
 	@SuppressWarnings("deprecation")
 	public AvroKeyValueSinkWriter(Map<String, String> properties) {
 		this.properties = properties;
-		
+
 		String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
 		if (keySchemaString == null) {
 			throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
 		}
-		Schema.parse(keySchemaString);//verifying that schema valid
-		
+		Schema.parse(keySchemaString); //verifying that schema valid
+
 		String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA);
 		if (valueSchemaString == null) {
 			throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
 		}
-		Schema.parse(valueSchemaString);//verifying that schema valid
+		Schema.parse(valueSchemaString); //verifying that schema valid
 	}
 
-	private boolean getBoolean(Map<String,String> conf, String key, boolean def) {
+	private boolean getBoolean(Map<String, String> conf, String key, boolean def) {
 		String value = conf.get(key);
 		if (value == null) {
 			return def;
 		}
 		return Boolean.parseBoolean(value);
 	}
-	
-	private int getInt(Map<String,String> conf, String key, int def) {
+
+	private int getInt(Map<String, String> conf, String key, int def) {
 		String value = conf.get(key);
 		if (value == null) {
 			return def;
@@ -116,7 +117,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
 	}
 
 	//this derived from AvroOutputFormatBase.getCompressionCodec(..)
-	private CodecFactory getCompressionCodec(Map<String,String> conf) {
+	private CodecFactory getCompressionCodec(Map<String, String> conf) {
 		if (getBoolean(conf, CONF_COMPRESS, false)) {
 			int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL);
 			int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL);
@@ -147,12 +148,12 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
 
 	@Override
 	public void close() throws IOException {
-		super.close();//the order is important since super.close flushes inside
+		super.close(); //the order is important since super.close flushes inside
 		if (keyValueWriter != null) {
 			keyValueWriter.close();
 		}
 	}
-	
+
 	@Override
 	public long flush() throws IOException {
 		if (keyValueWriter != null) {
@@ -184,7 +185,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
 	public Writer<Tuple2<K, V>> duplicate() {
 		return new AvroKeyValueSinkWriter<K, V>(properties);
 	}
-	
+
 	// taken from m/r avro lib to remove dependency on it
 	private static final class AvroKeyValueWriter<K, V> {
 		/** A writer for the Avro container file. */
@@ -245,7 +246,12 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
 		}
 	}
 
-	// taken from AvroKeyValue avro-mapr lib
+	/**
+	 * A reusable Avro generic record for writing key/value pairs to the
+	 * file.
+	 *
+	 * <p>taken from AvroKeyValue avro-mapr lib
+	 */
 	public static class AvroKeyValue<K, V> {
 		/** The name of the key value pair generic record. */
 		public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair";
@@ -293,7 +299,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
 
 		/**
 		 * Creates a KeyValuePair generic record schema.
-		 * 
+		 *
 		 * @return A schema for a generic record with two fields: 'key' and
 		 *         'value'.
 		 */

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
index 24ad6ab..9caf628 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.hadoop.fs.Path;
@@ -25,8 +26,8 @@ import java.io.Serializable;
  * A bucketer is used with a {@link RollingSink}
  * to put emitted elements into rolling files.
  *
- * <p>
- * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
+ *
+ * <p>The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
  * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
  * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
  * based on system time.

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
index 174707c..eb864c2 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
@@ -15,19 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.fs;
 
+package org.apache.flink.streaming.connectors.fs;
 
 /**
  * A clock that can provide the current time.
  *
- * <p>
- * Normally this would be system time, but for testing a custom {@code Clock} can be provided.
+ *
+ * <p>Normally this would be system time, but for testing a custom {@code Clock} can be provided.
  */
 public interface Clock {
 
 	/**
 	 * Return the current system time in milliseconds.
 	 */
-	public long currentTimeMillis();
+	long currentTimeMillis();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
index 0df8998..72b4823 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.hadoop.fs.Path;
@@ -29,27 +30,27 @@ import java.util.Date;
 /**
  * A {@link Bucketer} that assigns to buckets based on current system time.
  *
- * <p>
- * The {@code DateTimeBucketer} will create directories of the following form:
+ *
+ * <p>The {@code DateTimeBucketer} will create directories of the following form:
  * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
  * that was specified as a base path when creating the
  * {@link RollingSink}. The {@code dateTimePath}
  * is determined based on the current system time and the user provided format string.
  *
- * <p>
- * {@link SimpleDateFormat} is used to derive a date string from the current system time and
+ *
+ * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
  * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
  * files will have a granularity of hours.
  *
  *
- * <p>
- * Example:
+ *
+ * <p>Example:
  *
  * <pre>{@code
  *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
  * }</pre>
  *
- * This will create for example the following bucket path:
+ * <p>This will create for example the following bucket path:
  * {@code /base/1976-12-31-14/}
  *
  * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead.
@@ -57,7 +58,7 @@ import java.util.Date;
 @Deprecated
 public class DateTimeBucketer implements Bucketer {
 
-	private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
+	private static final Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
 
 	private static final long serialVersionUID = 1L;
 
@@ -95,7 +96,6 @@ public class DateTimeBucketer implements Bucketer {
 		this.dateFormatter = new SimpleDateFormat(formatString);
 	}
 
-
 	@Override
 	public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
 		String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
index 6854596..a03bcb5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
@@ -15,9 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
+
 import org.apache.hadoop.fs.Path;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 429d00a..3d3ea05 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs;
 
-import org.apache.commons.lang3.time.StopWatch;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,13 +58,13 @@ import java.util.UUID;
  * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This
  * is integrated with the checkpointing mechanism to provide exactly once semantics.
  *
- * <p>
- * When creating the sink a {@code basePath} must be specified. The base directory contains
+ *
+ * <p>When creating the sink a {@code basePath} must be specified. The base directory contains
  * one directory for every bucket. The bucket directories themselves contain several part files.
  * These contain the actual written data.
  *
- * <p>
- * The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
+ *
+ * <p>The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
  * base directory. Whenever the {@code Bucketer} returns a different directory name than
  * it returned before the sink will close the current part files inside that bucket
  * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with
@@ -71,8 +73,8 @@ import java.util.UUID;
  * {@link NonRollingBucketer} if you don't want to have
  * buckets but still write part files in a fault-tolerant way.
  *
- * <p>
- * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
+ *
+ * <p>The filenames of the part files contain the part prefix, the parallel subtask index of the sink
  * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is
  * {@code "part"} but this can be
  * configured using {@link #setPartPrefix(String)}. When a part file becomes bigger
@@ -80,8 +82,8 @@ import java.util.UUID;
  * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
  * using {@link #setBatchSize(long)}.
  *
- * <p>
- * Part files can be in one of three states: in-progress, pending or finished. The reason for this
+ *
+ * <p>Part files can be in one of three states: in-progress, pending or finished. The reason for this
  * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
  * and fault-tolerance. The part file that is currently being written to is in-progress. Once
  * a part file is closed for writing it becomes pending. When a checkpoint is successful the
@@ -95,21 +97,21 @@ import java.util.UUID;
  * the different file states and valid-length files can be configured, for example with
  * {@link #setPendingSuffix(String)}.
  *
- * <p>
- * Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
+ *
+ * <p>Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
  * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work
  * in a non-fault-tolerant way but still provide output without prefixes and suffixes.
  *
- * <p>
- * The part files are written using an instance of {@link Writer}. By default
+ *
+ * <p>The part files are written using an instance of {@link Writer}. By default
  * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
  * of {@code toString()} for every element. Separated by newlines. You can configure the writer
  * using {@link #setWriter(Writer)}. For example,
  * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write
  * Hadoop {@code SequenceFiles}.
  *
- * <p>
- * Example:
+ *
+ * <p>Example:
  *
  * <pre>{@code
  *     new RollingSink<Tuple2<IntWritable, Text>>(outPath)
@@ -117,7 +119,7 @@ import java.util.UUID;
  *         .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
  * }</pre>
  *
- * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
+ * <p>This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
  *
  * @see DateTimeBucketer
  * @see StringWriter
@@ -134,8 +136,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 
 	private static final long serialVersionUID = 1L;
 
-	private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
-
+	private static final Logger LOG = LoggerFactory.getLogger(RollingSink.class);
 
 	// --------------------------------------------------------------------------------------------
 	//  User configuration values
@@ -145,53 +146,52 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	/**
 	 * The default maximum size of part files (currently {@code 384 MB}).
 	 */
-	private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
+	private static final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
 
 	/**
 	 * This is used for part files that we are writing to but which where not yet confirmed
 	 * by a checkpoint.
 	 */
-	private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
 
 	/**
-	 * See above, but for prefix
+	 * See above, but for prefix.
 	 */
-	private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
+	private static final String DEFAULT_IN_PROGRESS_PREFIX = "_";
 
 	/**
 	 * This is used for part files that we are not writing to but which are not yet confirmed by
 	 * checkpoint.
 	 */
-	private final String DEFAULT_PENDING_SUFFIX = ".pending";
+	private static final String DEFAULT_PENDING_SUFFIX = ".pending";
 
 	/**
 	 * See above, but for prefix.
 	 */
-	private final String DEFAULT_PENDING_PREFIX = "_";
+	private static final String DEFAULT_PENDING_PREFIX = "_";
 
 	/**
 	 * When truncate() is not supported on the used FileSystem we instead write a
 	 * file along the part file with this ending that contains the length up to which
 	 * the part file is valid.
 	 */
-	private final String DEFAULT_VALID_SUFFIX = ".valid-length";
+	private static final String DEFAULT_VALID_SUFFIX = ".valid-length";
 
 	/**
 	 * See above, but for prefix.
 	 */
-	private final String DEFAULT_VALID_PREFIX = "_";
+	private static final String DEFAULT_VALID_PREFIX = "_";
 
 	/**
 	 * The default prefix for part files.
 	 */
-	private final String DEFAULT_PART_REFIX = "part";
+	private static final String DEFAULT_PART_REFIX = "part";
 
 	/**
 	 * The default timeout for asynchronous operations such as recoverLease and truncate. In
 	 * milliseconds.
 	 */
-	private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
-
+	private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
 
 	/**
 	 * The base {@code Path} that stores all bucket directories.
@@ -228,7 +228,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	private String pendingPrefix = DEFAULT_PENDING_PREFIX;
 
 	private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
-	private String validLengthPrefix= DEFAULT_VALID_PREFIX;
+	private String validLengthPrefix = DEFAULT_VALID_PREFIX;
 
 	private String partPrefix = DEFAULT_PART_REFIX;
 
@@ -242,7 +242,6 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	//  Internal fields (not configurable by user)
 	// --------------------------------------------------------------------------------------------
 
-
 	/**
 	 * The part file that we are currently writing to.
 	 */
@@ -266,7 +265,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 
 	/**
 	 * We use reflection to get the .truncate() method, this is only available starting with
-	 * Hadoop 2.7
+	 * Hadoop 2.7.
 	 */
 	private transient Method refTruncate;
 
@@ -290,8 +289,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	/**
 	 * Creates a new {@code RollingSink} that writes files to the given base directory.
 	 *
-	 * <p>
-	 * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
+	 *
+	 * <p>This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
 	 * The maximum bucket size is set to 384 MB.
 	 *
 	 * @param basePath The directory to which to write the bucket files.
@@ -319,7 +318,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	 */
 	public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
 		this.fsConfig = new Configuration();
-		for(Map.Entry<String, String> entry : config) {
+		for (Map.Entry<String, String> entry : config) {
 			fsConfig.setString(entry.getKey(), entry.getValue());
 		}
 		return this;
@@ -380,7 +379,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	}
 
 	/**
-	 * Create a file system with the user-defined hdfs config
+	 * Create a file system with the user-defined hdfs config.
 	 * @throws IOException
 	 */
 	private void initFileSystem() throws IOException {
@@ -415,8 +414,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	/**
 	 * Determines whether we should change the bucket file we are writing to.
 	 *
-	 * <p>
-	 * This will roll if no file was created yet, if the file size is larger than the specified size
+	 *
+	 * <p>This will roll if no file was created yet, if the file size is larger than the specified size
 	 * or if the {@code Bucketer} determines that we should roll.
 	 */
 	private boolean shouldRoll() throws IOException {
@@ -449,8 +448,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	/**
 	 * Opens a new part file.
 	 *
-	 * <p>
-	 * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
+	 *
+	 * <p>This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
 	 */
 	private void openNewPartFile() throws Exception {
 		closeCurrentPartFile();
@@ -505,8 +504,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	/**
 	 * Closes the current part file.
 	 *
-	 * <p>
-	 * This moves the current in-progress part file to a pending file and adds it to the list
+	 *
+	 * <p>This moves the current in-progress part file to a pending file and adds it to the list
 	 * of pending files in our bucket state.
 	 */
 	private void closeCurrentPartFile() throws Exception {
@@ -526,8 +525,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
 
 	/**
 	 * Gets the truncate() call using reflection.
-	 * <p>
-	 * <b>NOTE: </b>This code comes from Flume
+	 *
+	 * <p><b>NOTE: </b>This code comes from Flume
 	 */
 	private Method reflectTruncate(FileSystem fs) {
 		Method m = null;
@@ -604,7 +603,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 			"The " + getClass().getSimpleName() + " has not been properly initialized.");
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
-		
+
 		if (isWriterOpen) {
 			bucketState.currentFile = currentPartPath.toString();
 			bucketState.currentFileValidLength = writer.flush();
@@ -668,11 +667,11 @@ public class RollingSink<T> extends RichSinkFunction<T>
 						DistributedFileSystem dfs = (DistributedFileSystem) fs;
 						LOG.debug("Trying to recover file lease {}", partPath);
 						dfs.recoverLease(partPath);
-						boolean isclosed= dfs.isFileClosed(partPath);
+						boolean isclosed = dfs.isFileClosed(partPath);
 						StopWatch sw = new StopWatch();
 						sw.start();
-						while(!isclosed) {
-							if(sw.getTime() > asyncTimeout) {
+						while (!isclosed) {
+							if (sw.getTime() > asyncTimeout) {
 								break;
 							}
 							try {
@@ -691,8 +690,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
 						StopWatch sw = new StopWatch();
 						sw.start();
 						long newLen = fs.getFileStatus(partPath).getLen();
-						while(newLen != bucketState.currentFileValidLength) {
-							if(sw.getTime() > asyncTimeout) {
+						while (newLen != bucketState.currentFileValidLength) {
+							if (sw.getTime() > asyncTimeout) {
 								break;
 							}
 							try {
@@ -749,7 +748,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 					}
 				} catch (IOException e) {
 					LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
-					throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to final path " + finalPath, e);
+					throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
 				}
 			}
 		}
@@ -785,8 +784,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	/**
 	 * Sets the maximum bucket size in bytes.
 	 *
-	 * <p>
-	 * When a bucket part file becomes larger than this size a new bucket part file is started and
+	 *
+	 * <p>When a bucket part file becomes larger than this size a new bucket part file is started and
 	 * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
 	 *
 	 * @param batchSize The bucket part file size in bytes.
@@ -875,8 +874,8 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	/**
 	 * Disable cleanup of leftover in-progress/pending files when the sink is opened.
 	 *
-	 * <p>
-	 * This should only be disabled if using the sink without checkpoints, to not remove
+	 *
+	 * <p>This should only be disabled if using the sink without checkpoints, to not remove
 	 * the files already in the directory.
 	 *
 	 * @deprecated This option is deprecated and remains only for backwards compatibility.

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 32cadec..901589f 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.fs;
 
+package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -88,7 +89,7 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
 		}
 
 		CompressionCodec codec = null;
-		
+
 		Configuration conf = HadoopFileSystem.getHadoopConfiguration();
 
 		if (!compressionCodecName.equals("None")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index a04e4b5..3e9eb11 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -15,12 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +37,7 @@ import java.util.EnumSet;
  */
 public abstract class StreamWriterBase<T> implements Writer<T> {
 
-	private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
+	private static final Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
 
 	/**
 	 * The {@code FSDataOutputStream} for the current part file.
@@ -61,11 +63,11 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
 	/**
 	 * If hflush is available in this version of HDFS, then this method calls
 	 * hflush, else it calls sync.
+	 *
+	 * <p>Note: This code comes from Flume
+	 *
 	 * @param os - The stream to flush/sync
 	 * @throws java.io.IOException
-	 *
-	 * <p>
-	 * Note: This code comes from Flume
 	 */
 	protected void hflushOrSync(FSDataOutputStream os) throws IOException {
 		try {
@@ -80,8 +82,8 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
 			String msg = "Error while trying to hflushOrSync!";
 			LOG.error(msg + " " + e.getCause());
 			Throwable cause = e.getCause();
-			if(cause != null && cause instanceof IOException) {
-				throw (IOException)cause;
+			if (cause != null && cause instanceof IOException) {
+				throw (IOException) cause;
 			}
 			throw new RuntimeException(msg, e);
 		} catch (Exception e) {
@@ -94,12 +96,11 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
 	/**
 	 * Gets the hflush call using reflection. Fallback to sync if hflush is not available.
 	 *
-	 * <p>
-	 * Note: This code comes from Flume
+	 * <p>Note: This code comes from Flume
 	 */
 	private Method reflectHflushOrSync(FSDataOutputStream os) {
 		Method m = null;
-		if(os != null) {
+		if (os != null) {
 			Class<?> fsDataOutputStreamClass = os.getClass();
 			try {
 				m = fsDataOutputStreamClass.getMethod("hflush");

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 6568a86..d2ef9d6 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.hadoop.fs.FSDataOutputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
index 41663df..eedb370 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.fs;
 
+package org.apache.flink.streaming.connectors.fs;
 
 /**
  * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
index c3b4cb6..ab896c8 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
@@ -15,9 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -58,7 +60,7 @@ public interface Writer<T> extends Serializable {
 	 * taken. The call should close all state related to the current output file,
 	 * including the output stream opened in {@code open}.
 	 */
-	void close() throws IOException ;
+	void close() throws IOException;
 
 	/**
 	 * Writes one element to the bucket file.

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
index 0bf14b3..2f325f6 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
@@ -15,9 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.flink.streaming.connectors.fs.Clock;
+
 import org.apache.hadoop.fs.Path;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
index 86aa9f3..f2eebf3 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
@@ -15,9 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.flink.streaming.connectors.fs.Clock;
+
 import org.apache.hadoop.fs.Path;
 
 import java.io.Serializable;
@@ -26,8 +28,8 @@ import java.io.Serializable;
  * A bucketer is used with a {@link BucketingSink}
  * to put emitted elements into rolling files.
  *
- * <p>
- * The {@code BucketingSink} can be writing to many buckets at a time, and it is responsible for managing
+ *
+ * <p>The {@code BucketingSink} can be writing to many buckets at a time, and it is responsible for managing
  * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket
  * path the element should fall in. The {@code Bucketer} can, for example, determine buckets based on
  * system time.

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 58dd4dc..70168b5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
-import org.apache.commons.lang3.time.StopWatch;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
@@ -41,6 +41,8 @@ import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -54,23 +56,23 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.Iterator;
 
 /**
  * Sink that emits its input elements to {@link FileSystem} files within
  * buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
  *
- * <p>
- * When creating the sink a {@code basePath} must be specified. The base directory contains
+ *
+ * <p>When creating the sink a {@code basePath} must be specified. The base directory contains
  * one directory for every bucket. The bucket directories themselves contain several part files,
  * one for each parallel subtask of the sink. These part files contain the actual output data.
  *
- * <p>
- * The sink uses a {@link Bucketer} to determine in which bucket directory each element should
+ *
+ * <p>The sink uses a {@link Bucketer} to determine in which bucket directory each element should
  * be written to inside the base directory. The {@code Bucketer} can, for example, use time or
  * a property of the element to determine the bucket directory. The default {@code Bucketer} is a
  * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
@@ -78,8 +80,8 @@ import java.util.Iterator;
  * {@link BasePathBucketer} if you don't want to have buckets but still want to write part-files
  * in a fault-tolerant way.
  *
- * <p>
- * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
+ *
+ * <p>The filenames of the part files contain the part prefix, the parallel subtask index of the sink
  * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
  * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default
  * the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}.
@@ -87,8 +89,8 @@ import java.util.Iterator;
  * the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB},
  * this can be configured using {@link #setBatchSize(long)}.
  *
- * <p>
- * In some scenarios, the open buckets are required to change based on time. In these cases, the sink
+ *
+ * <p>In some scenarios, the open buckets are required to change based on time. In these cases, the sink
  * needs to determine when a bucket has become inactive, in order to flush and close the part file.
  * To support this there are two configurable settings:
  * <ol>
@@ -97,17 +99,17 @@ import java.util.Iterator;
  *     <li>the minimum amount of time a bucket has to not receive any data before it is considered inactive,
  *     configured by {@link #setInactiveBucketThreshold(long)}</li>
  * </ol>
- * Both of these parameters default to {@code 60,000 ms}, or {@code 1 min}.
+ * Both of these parameters default to {@code 60, 000 ms}, or {@code 1 min}.
  *
- * <p>
- * Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
+ *
+ * <p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
  * The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
  * semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
  * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
  * pending files will be moved to {@code finished}.
  *
- * <p>
- * If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
+ *
+ * <p>If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
  * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending}
  * state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
  * they do not contain data that arrived after the checkpoint from which we restore. If the {@code FileSystem} supports
@@ -117,8 +119,8 @@ import java.util.Iterator;
  * to that point. The prefixes and suffixes for the different file states and valid-length files can be configured
  * using the adequate setter method, e.g. {@link #setPendingSuffix(String)}.
  *
- * <p>
- * <b>NOTE:</b>
+ *
+ * <p><b>NOTE:</b>
  * <ol>
  *     <li>
  *         If checkpointing is not enabled the pending files will never be moved to the finished state. In that case,
@@ -134,15 +136,15 @@ import java.util.Iterator;
  *     </li>
  * </ol>
  *
- * <p>
- * Example:
+ *
+ * <p>Example:
  * <pre>{@code
  *     new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
  *         .setWriter(new SequenceFileWriter<IntWritable, Text>())
  *         .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
  * }</pre>
  *
- * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
+ * <p>This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
  *
  * @see DateTimeBucketer
  * @see StringWriter
@@ -157,7 +159,7 @@ public class BucketingSink<T>
 
 	private static final long serialVersionUID = 1L;
 
-	private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
+	private static final Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
 
 	// --------------------------------------------------------------------------------------------
 	//  User configuration values
@@ -167,69 +169,68 @@ public class BucketingSink<T>
 	/**
 	 * The default maximum size of part files (currently {@code 384 MB}).
 	 */
-	private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
+	private static final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
 
 	/**
 	 * The default time between checks for inactive buckets. By default, {60 sec}.
 	 */
-	private final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L;
+	private static final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L;
 
 	/**
 	 * The default threshold (in {@code ms}) for marking a bucket as inactive and
 	 * closing its part files. By default, {60 sec}.
 	 */
-	private final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
+	private static final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
 
 	/**
 	 * The suffix for {@code in-progress} part files. These are files we are
 	 * currently writing to, but which were not yet confirmed by a checkpoint.
 	 */
-	private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
 
 	/**
 	 * The prefix for {@code in-progress} part files. These are files we are
 	 * currently writing to, but which were not yet confirmed by a checkpoint.
 	 */
-	private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
+	private static final String DEFAULT_IN_PROGRESS_PREFIX = "_";
 
 	/**
 	 * The suffix for {@code pending} part files. These are closed files that we are
 	 * not currently writing to (inactive or reached {@link #batchSize}), but which
 	 * were not yet confirmed by a checkpoint.
 	 */
-	private final String DEFAULT_PENDING_SUFFIX = ".pending";
+	private static final String DEFAULT_PENDING_SUFFIX = ".pending";
 
 	/**
 	 * The prefix for {@code pending} part files. These are closed files that we are
 	 * not currently writing to (inactive or reached {@link #batchSize}), but which
 	 * were not yet confirmed by a checkpoint.
 	 */
-	private final String DEFAULT_PENDING_PREFIX = "_";
+	private static final String DEFAULT_PENDING_PREFIX = "_";
 
 	/**
 	 * When {@code truncate()} is not supported by the used {@link FileSystem}, we create
 	 * a file along the part file with this suffix that contains the length up to which
 	 * the part file is valid.
 	 */
-	private final String DEFAULT_VALID_SUFFIX = ".valid-length";
+	private static final String DEFAULT_VALID_SUFFIX = ".valid-length";
 
 	/**
 	 * When {@code truncate()} is not supported by the used {@link FileSystem}, we create
 	 * a file along the part file with this preffix that contains the length up to which
 	 * the part file is valid.
 	 */
-	private final String DEFAULT_VALID_PREFIX = "_";
+	private static final String DEFAULT_VALID_PREFIX = "_";
 
 	/**
 	 * The default prefix for part files.
 	 */
-	private final String DEFAULT_PART_REFIX = "part";
+	private static final String DEFAULT_PART_REFIX = "part";
 
 	/**
 	 * The default timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
 	 */
-	private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
-
+	private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
 
 	/**
 	 * The base {@code Path} that stores all bucket directories.
@@ -259,7 +260,7 @@ public class BucketingSink<T>
 	private String pendingPrefix = DEFAULT_PENDING_PREFIX;
 
 	private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
-	private String validLengthPrefix= DEFAULT_VALID_PREFIX;
+	private String validLengthPrefix = DEFAULT_VALID_PREFIX;
 
 	private String partPrefix = DEFAULT_PART_REFIX;
 
@@ -273,7 +274,7 @@ public class BucketingSink<T>
 	// -------------------------------------------ยง-------------------------------------------------
 
 	/**
-	 * We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7
+	 * We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7 .
 	 */
 	private transient Method refTruncate;
 
@@ -286,7 +287,7 @@ public class BucketingSink<T>
 	private transient ListState<State<T>> restoredBucketStates;
 
 	/**
-	 * User-defined FileSystem parameters
+	 * User-defined FileSystem parameters.
 	 */
 	private Configuration fsConfig;
 
@@ -302,8 +303,8 @@ public class BucketingSink<T>
 	/**
 	 * Creates a new {@code BucketingSink} that writes files to the given base directory.
 	 *
-	 * <p>
-	 * This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer.
+	 *
+	 * <p>This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer.
 	 * The maximum bucket size is set to 384 MB.
 	 *
 	 * @param basePath The directory to which to write the bucket files.
@@ -330,7 +331,7 @@ public class BucketingSink<T>
 	 */
 	public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
 		this.fsConfig = new Configuration();
-		for(Map.Entry<String, String> entry : config) {
+		for (Map.Entry<String, String> entry : config) {
 			fsConfig.setString(entry.getKey(), entry.getValue());
 		}
 		return this;
@@ -572,12 +573,12 @@ public class BucketingSink<T>
 
 	/**
 	 * Gets the truncate() call using reflection.
-	 * <p>
-	 * <b>NOTE:</b> This code comes from Flume.
+	 *
+	 * <p><b>NOTE:</b> This code comes from Flume.
 	 */
 	private Method reflectTruncate(FileSystem fs) {
 		Method m = null;
-		if(fs != null) {
+		if (fs != null) {
 			Class<?> fsClass = fs.getClass();
 			try {
 				m = fsClass.getMethod("truncate", Path.class, long.class);
@@ -897,8 +898,8 @@ public class BucketingSink<T>
 	/**
 	 * Sets the maximum bucket size in bytes.
 	 *
-	 * <p>
-	 * When a bucket part file becomes larger than this size a new bucket part file is started and
+	 *
+	 * <p>When a bucket part file becomes larger than this size a new bucket part file is started and
 	 * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
 	 *
 	 * @param batchSize The bucket part file size in bytes.
@@ -1008,8 +1009,8 @@ public class BucketingSink<T>
 	/**
 	 * Disable cleanup of leftover in-progress/pending files when the sink is opened.
 	 *
-	 * <p>
-	 * This should only be disabled if using the sink without checkpoints, to not remove
+	 *
+	 * <p>This should only be disabled if using the sink without checkpoints, to not remove
 	 * the files already in the directory.
 	 *
 	 * @deprecated This option is deprecated and remains only for backwards compatibility.

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
index b985e14..b7035fe 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
@@ -15,9 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.flink.streaming.connectors.fs.Clock;
+
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
@@ -28,27 +30,27 @@ import java.util.Date;
 /**
  * A {@link Bucketer} that assigns to buckets based on current system time.
  *
- * <p>
- * The {@code DateTimeBucketer} will create directories of the following form:
+ *
+ * <p>The {@code DateTimeBucketer} will create directories of the following form:
  * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
  * that was specified as a base path when creating the
  * {@link BucketingSink}. The {@code dateTimePath}
  * is determined based on the current system time and the user provided format string.
  *
- * <p>
- * {@link SimpleDateFormat} is used to derive a date string from the current system time and
+ *
+ * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
  * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
  * files will have a granularity of hours.
  *
  *
- * <p>
- * Example:
+ *
+ * <p>Example:
  *
  * <pre>{@code
  *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
  * }</pre>
  *
- * This will create for example the following bucket path:
+ * <p>This will create for example the following bucket path:
  * {@code /base/1976-12-31-14/}
  *
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 2d8492f..b096db4 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs;
 
-import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
 import org.apache.flink.util.NetUtils;
+
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -55,8 +57,8 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
  *
- * <p>
- * This test only verifies the exactly once behaviour of the sink. Another test tests the
+ *
+ * <p>This test only verifies the exactly once behaviour of the sink. Another test tests the
  * rolling behaviour.
  *
  * @deprecated should be removed with the {@link RollingSink}.
@@ -64,7 +66,7 @@ import static org.junit.Assert.assertTrue;
 @Deprecated
 public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
 
-	final long NUM_STRINGS = 16_000;
+	private static final long NUM_STRINGS = 16_000;
 
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
@@ -105,10 +107,8 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 	public void testProgram(StreamExecutionEnvironment env) {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
-		int PARALLELISM = 12;
-
 		env.enableCheckpointing(20);
-		env.setParallelism(PARALLELISM);
+		env.setParallelism(12);
 		env.disableOperatorChaining();
 
 		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
@@ -211,7 +211,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 		private long failurePos;
 		private long count;
 
-
 		OnceFailingIdentityMapper(long numElements) {
 			this.numElements = numElements;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 72f2f21..10d1846 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -18,15 +18,6 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericData.StringType;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -45,9 +36,18 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
 
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -77,8 +76,8 @@ import java.util.Map;
  * tests test the different output methods as well as the rolling feature using a manual clock
  * that increases time in lockstep with element computation using latches.
  *
- * <p>
- * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
+ *
+ * <p>This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
  * exactly once behaviour.
  *
  * @deprecated should be removed with the {@link RollingSink}.
@@ -128,13 +127,12 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 	 */
 	@Test
 	public void testNonRollingStringWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
+		final int numElements = 20;
 		final String outPath = hdfsURI + "/string-non-rolling-out";
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
+		env.setParallelism(2);
 
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
 				.broadcast()
 				.filter(new OddEvenFilter());
 
@@ -145,7 +143,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 				.setPendingSuffix("");
 
 		source
-				.map(new MapFunction<Tuple2<Integer,String>, String>() {
+				.map(new MapFunction<Tuple2<Integer, String>, String>() {
 					private static final long serialVersionUID = 1L;
 					@Override
 					public String map(Tuple2<Integer, String> value) throws Exception {
@@ -160,7 +158,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
 
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 0; i < numElements; i += 2) {
 			String line = br.readLine();
 			Assert.assertEquals("message #" + i, line);
 		}
@@ -171,7 +169,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 		br = new BufferedReader(new InputStreamReader(inStream));
 
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 1; i < numElements; i += 2) {
 			String line = br.readLine();
 			Assert.assertEquals("message #" + i, line);
 		}
@@ -185,17 +183,16 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 	 */
 	@Test
 	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
+		final int numElements = 20;
 		final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
+		env.setParallelism(2);
 
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
 				.broadcast()
 				.filter(new OddEvenFilter());
 
-		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -204,7 +201,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-
 		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
 				.setWriter(new SequenceFileWriter<IntWritable, Text>())
 				.setBucketer(new NonRollingBucketer())
@@ -227,7 +223,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		IntWritable intWritable = new IntWritable();
 		Text txt = new Text();
 
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 0; i < numElements; i += 2) {
 			reader.next(intWritable, txt);
 			Assert.assertEquals(i, intWritable.get());
 			Assert.assertEquals("message #" + i, txt.toString());
@@ -244,7 +240,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 				100000,
 				new Configuration());
 
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 1; i < numElements; i += 2) {
 			reader.next(intWritable, txt);
 			Assert.assertEquals(i, intWritable.get());
 			Assert.assertEquals("message #" + i, txt.toString());
@@ -260,17 +256,16 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 	 */
 	@Test
 	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
+		final int numElements = 20;
 		final String outPath = hdfsURI + "/seq-non-rolling-out";
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
+		env.setParallelism(2);
 
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
 				.broadcast()
 				.filter(new OddEvenFilter());
 
-		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -279,7 +274,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-
 		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
 				.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
 				.setBucketer(new NonRollingBucketer())
@@ -302,7 +296,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		IntWritable intWritable = new IntWritable();
 		Text txt = new Text();
 
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 0; i < numElements; i += 2) {
 			reader.next(intWritable, txt);
 			Assert.assertEquals(i, intWritable.get());
 			Assert.assertEquals("message #" + i, txt.toString());
@@ -319,7 +313,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 				100000,
 				new Configuration());
 
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 1; i < numElements; i += 2) {
 			reader.next(intWritable, txt);
 			Assert.assertEquals(i, intWritable.get());
 			Assert.assertEquals("message #" + i, txt.toString());
@@ -328,25 +322,22 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		reader.close();
 		inStream.close();
 	}
-	
-	
+
 	/**
 	 * This tests {@link AvroKeyValueSinkWriter}
 	 * with non-rolling output and without compression.
 	 */
 	@Test
 	public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
+		final int numElements = 20;
 		final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
+		env.setParallelism(2);
 
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
 				.broadcast()
 				.filter(new OddEvenFilter());
 
-
 		Map<String, String> properties = new HashMap<>();
 		Schema keySchema = Schema.create(Type.INT);
 		Schema valueSchema = Schema.create(Type.STRING);
@@ -369,7 +360,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
 		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema);
 		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 0; i < numElements; i += 2) {
 			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
 			int key = wrappedEntry.getKey().intValue();
 			Assert.assertEquals(i, key);
@@ -383,7 +374,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		inStream = dfs.open(new Path(outPath + "/part-1-0"));
 		dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
 
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 1; i < numElements; i += 2) {
 			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
 			int key = wrappedEntry.getKey().intValue();
 			Assert.assertEquals(i, key);
@@ -394,24 +385,22 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		dataFileStream.close();
 		inStream.close();
 	}
-	
+
 	/**
 	 * This tests {@link AvroKeyValueSinkWriter}
 	 * with non-rolling output and with compression.
 	 */
 	@Test
 	public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
+		final int numElements = 20;
 		final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
+		env.setParallelism(2);
 
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
 				.broadcast()
 				.filter(new OddEvenFilter());
 
-
 		Map<String, String> properties = new HashMap<>();
 		Schema keySchema = Schema.create(Type.INT);
 		Schema valueSchema = Schema.create(Type.STRING);
@@ -436,7 +425,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
 		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema);
 		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 0; i < numElements; i += 2) {
 			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
 			int key = wrappedEntry.getKey().intValue();
 			Assert.assertEquals(i, key);
@@ -450,7 +439,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		inStream = dfs.open(new Path(outPath + "/part-1-0"));
 		dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
 
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 1; i < numElements; i += 2) {
 			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
 			int key = wrappedEntry.getKey().intValue();
 			Assert.assertEquals(i, key);
@@ -462,20 +451,18 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		inStream.close();
 	}
 
-
 	/**
-	 * This tests user defined hdfs configuration
+	 * This tests user defined hdfs configuration.
 	 * @throws Exception
      */
 	@Test
 	public void testUserDefinedConfiguration() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
+		final int numElements = 20;
 		final String outPath = hdfsURI + "/string-non-rolling-with-config";
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
+		env.setParallelism(2);
 
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
 			.broadcast()
 			.filter(new OddEvenFilter());
 
@@ -490,7 +477,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 			.setPendingSuffix("");
 
 		source
-			.map(new MapFunction<Tuple2<Integer,String>, String>() {
+			.map(new MapFunction<Tuple2<Integer, String>, String>() {
 				private static final long serialVersionUID = 1L;
 				@Override
 				public String map(Tuple2<Integer, String> value) throws Exception {
@@ -505,7 +492,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
 
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 0; i < numElements; i += 2) {
 			String line = br.readLine();
 			Assert.assertEquals("message #" + i, line);
 		}
@@ -516,7 +503,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 		br = new BufferedReader(new InputStreamReader(inStream));
 
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+		for (int i = 1; i < numElements; i += 2) {
 			String line = br.readLine();
 			Assert.assertEquals("message #" + i, line);
 		}
@@ -525,8 +512,8 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	// we use this to synchronize the clock changes to elements being processed
-	final static MultiShotLatch latch1 = new MultiShotLatch();
-	final static MultiShotLatch latch2 = new MultiShotLatch();
+	private static final MultiShotLatch latch1 = new MultiShotLatch();
+	private static final MultiShotLatch latch2 = new MultiShotLatch();
 
 	/**
 	 * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
@@ -536,19 +523,16 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 	 */
 	@Test
 	public void testDateTimeRollingStringWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
+		final int numElements = 20;
 		final String outPath = hdfsURI + "/rolling-out";
 		DateTimeBucketer.setClock(new ModifyableClock());
 		ModifyableClock.setCurrentTime(0);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
-
-
+		env.setParallelism(2);
 
 		DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
-				NUM_ELEMENTS))
+				numElements))
 				.broadcast();
 
 		// the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
@@ -664,7 +648,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 		testHarness.processElement(new StreamRecord<>("test1", 1L));
 		testHarness.processElement(new StreamRecord<>("test2", 1L));
-		checkFs(outDir, 1, 1 ,0, 0);
+		checkFs(outDir, 1, 1 , 0, 0);
 
 		testHarness.processElement(new StreamRecord<>("test3", 1L));
 		checkFs(outDir, 1, 2, 0, 0);
@@ -961,7 +945,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-	public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
+	private static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -974,7 +958,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-	public static class ModifyableClock implements Clock {
+	private static class ModifyableClock implements Clock {
 
 		private static volatile long currentTime = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 768ca5e..6bd75d4 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -26,9 +26,10 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SecureTestEnvironment;
-import org.apache.flink.test.util.TestingSecurityContext;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestingSecurityContext;
 import org.apache.flink.util.NetUtils;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.http.HttpConfig;
@@ -47,21 +48,21 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 
 /**
  * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment
- * Note: only executed for Hadoop version > 3.x.x
+ * Note: only executed for Hadoop version > 3.x.x.
  */
 public class RollingSinkSecuredITCase extends RollingSinkITCase {
 
@@ -141,7 +142,6 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 		map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath());
 		TestBaseUtils.setEnv(map);
 
-
 		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
 		builder.checkDataNodeAddrConfig(true);
 		builder.checkDataNodeHostConfig(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/7292c874/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
index 85f23b6..1ed4a7f 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
-import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
 import org.apache.flink.util.NetUtils;
+
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -37,11 +39,11 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.BufferedReader;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -55,13 +57,13 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests for {@link BucketingSink}.
  *
- * <p>
- * This test only verifies the exactly once behaviour of the sink. Another test tests the
+ *
+ * <p>This test only verifies the exactly once behaviour of the sink. Another test tests the
  * rolling behaviour.
  */
 public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
 
-	final long NUM_STRINGS = 16_000;
+	static final long NUM_STRINGS = 16_000;
 
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
@@ -102,10 +104,8 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
 	public void testProgram(StreamExecutionEnvironment env) {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
-		int PARALLELISM = 12;
-
 		env.enableCheckpointing(20);
-		env.setParallelism(PARALLELISM);
+		env.setParallelism(12);
 		env.disableOperatorChaining();
 
 		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
@@ -208,7 +208,6 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
 		private long failurePos;
 		private long count;
 
-
 		OnceFailingIdentityMapper(long numElements) {
 			this.numElements = numElements;
 		}