You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:31 UTC

[14/17] flink git commit: [hotfix] [core] Fix checkstyle for 'flink-core' : 'org.apache.flink.configuration'

[hotfix] [core] Fix checkstyle for 'flink-core' : 'org.apache.flink.configuration'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f9c2d97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f9c2d97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f9c2d97

Branch: refs/heads/master
Commit: 1f9c2d9740ffea2b59b8f5f3da287a0dc890ddbf
Parents: 212ee3d
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jan 18 17:27:13 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:04 2018 +0100

----------------------------------------------------------------------
 flink-core/pom.xml                              |  21 +-
 .../apache/flink/configuration/AkkaOptions.java |   1 -
 .../flink/configuration/BlobServerOptions.java  |   2 +-
 .../flink/configuration/ConfigConstants.java    | 219 ++++++++++---------
 .../apache/flink/configuration/ConfigGroup.java |   6 +-
 .../flink/configuration/ConfigGroups.java       |   4 +-
 .../flink/configuration/ConfigOption.java       |  16 +-
 .../flink/configuration/ConfigOptions.java      |  22 +-
 .../ConfigOptionsDocGenerator.java              |   1 +
 .../flink/configuration/Configuration.java      | 117 +++++-----
 .../apache/flink/configuration/CoreOptions.java |   1 -
 .../configuration/DelegatingConfiguration.java  |   8 +-
 .../configuration/GlobalConfiguration.java      |  33 +--
 .../configuration/HeartbeatManagerOptions.java  |   6 +-
 .../configuration/HighAvailabilityOptions.java  |  28 +--
 .../configuration/HistoryServerOptions.java     |   1 +
 .../IllegalConfigurationException.java          |   2 +-
 .../flink/configuration/JobManagerOptions.java  |  14 +-
 .../apache/flink/configuration/MemorySize.java  |  28 +--
 .../flink/configuration/MetricOptions.java      |   6 +-
 .../configuration/ResourceManagerOptions.java   |   6 +-
 .../apache/flink/configuration/RestOptions.java |   1 +
 .../flink/configuration/TaskManagerOptions.java |  14 +-
 .../UnmodifiableConfiguration.java              |   5 +-
 .../apache/flink/configuration/WebOptions.java  |   9 +-
 .../ConfigDocsCompletenessChecker.java          |  12 +-
 .../ConfigOptionsDocGeneratorTest.java          | 117 +++++-----
 .../flink/configuration/ConfigurationTest.java  |  40 ++--
 .../DelegatingConfigurationTest.java            |  14 +-
 .../FilesystemSchemeConfigTest.java             |   3 +
 .../configuration/GlobalConfigurationTest.java  |  15 +-
 .../flink/configuration/MemorySizeTest.java     |   4 +-
 .../UnmodifiableConfigurationTest.java          |  17 +-
 tools/maven/suppressions-core.xml               |   8 -
 34 files changed, 421 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 0465227..285074e 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -204,7 +204,7 @@ under the License.
 							<exclude>org.apache.flink.api.java.typeutils.AvroTypeInfo</exclude>
 							<!-- Breaking changes between 1.1 and 1.2.
 							We ignore these changes because these are low-level, internal runtime configuration parameters -->
-							<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants</exclude>
 							<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL</exclude>
 							<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_NETWORK_REQUEST_BACKOFF_MAX</exclude>
 							<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS</exclude>
@@ -212,6 +212,25 @@ under the License.
 							<exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_INITIAL_KEY</exclude>
 							<exclude>org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_MAX_KEY</exclude>
 
+							<!-- fields that were accidentally not final in the beginning -->
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_RECOVERY_MODE</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_SECURITY_SSL_ENABLED</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_STATE_BACKEND</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_FRAMESIZE</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE</exclude>
+							<exclude>org.apache.flink.configuration.ConfigConstants.DEFAULT_AKKA_SSL_ENABLED</exclude>
+
 							<!-- apparently there is a bug in the plugin which makes it fail on this new file, event though
 								its new, and not conflicting/breaking -->
 							<exclude>org.apache.flink.api.common.serialization.DeserializationSchema</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index cf65939..09ea490 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 
 /**
  * Akka configuration options.
- *
  * TODO: Migrate other akka config options to this file
  */
 @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
index 8680096..a813d01 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -62,7 +62,7 @@ public class BlobServerOptions {
 	 * a range of ports: "50100-50200"
 	 * or a list of ranges and or points: "50100-50200,50300-50400,51234"
 	 *
-	 * Setting the port to 0 will let the OS choose an available port.
+	 * <p>Setting the port to 0 will let the OS choose an available port.
 	 */
 	public static final ConfigOption<String> PORT =
 		key("blob.server.port")

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d1dcef5..7b76f53 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -31,12 +31,13 @@ import static org.apache.flink.configuration.ConfigOptions.key;
  * the default values.
  */
 @Public
+@SuppressWarnings("unused")
 public final class ConfigConstants {
 
 	// ------------------------------------------------------------------------
 	//                            Configuration Keys
 	// ------------------------------------------------------------------------
-	
+
 	// ---------------------------- Parallelism -------------------------------
 
 	/**
@@ -112,7 +113,7 @@ public final class ConfigConstants {
 	@Deprecated
 	@PublicEvolving
 	public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";
-	
+
 	// -------------------------------- Runtime -------------------------------
 
 	/**
@@ -202,7 +203,7 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_DATA_PORT_KEY = "taskmanager.data.port";
 
 	/**
-	 * Config parameter to override SSL support for taskmanager's data transport
+	 * Config parameter to override SSL support for taskmanager's data transport.
 	 */
 	public static final String TASK_MANAGER_DATA_SSL_ENABLED = "taskmanager.data.ssl.enabled";
 
@@ -216,7 +217,7 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_TMP_DIR_KEY = "taskmanager.tmp.dirs";
 
 	/**
-	 * The config parameter defining the taskmanager log file location
+	 * The config parameter defining the taskmanager log file location.
 	 */
 	public static final String TASK_MANAGER_LOG_PATH_KEY = "taskmanager.log.path";
 
@@ -229,7 +230,7 @@ public final class ConfigConstants {
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_SIZE_KEY = "taskmanager.memory.size";
-	
+
 	/**
 	 * The config parameter defining the fraction of free memory allocated by the memory manager.
 	 *
@@ -271,7 +272,7 @@ public final class ConfigConstants {
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY = "taskmanager.memory.segment-size";
-	
+
 	/**
 	 * The implementation to use for spillable/spilled intermediate results, which have both
 	 * synchronous and asynchronous implementations: "sync" or "async".
@@ -323,14 +324,14 @@ public final class ConfigConstants {
 	public static final String TASK_CANCELLATION_INTERVAL_MILLIS = "task.cancellation-interval";
 
 	// --------------------------- Runtime Algorithms -------------------------------
-	
+
 	/**
 	 * Parameter for the maximum fan for out-of-core algorithms.
 	 * Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
-	 * for hybrid hash joins. 
+	 * for hybrid hash joins.
 	 */
 	public static final String DEFAULT_SPILLING_MAX_FAN_KEY = "taskmanager.runtime.max-fan";
-	
+
 	/**
 	 * Key for the default spilling threshold. When more than the threshold memory of the sort buffers is full, the
 	 * sorter will start spilling to disk.
@@ -341,7 +342,7 @@ public final class ConfigConstants {
 	 * Parameter to switch hash join bloom filters for spilled partitions on and off.
 	 */
 	public static final String RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY = "taskmanager.runtime.hashjoin-bloom-filters";
-	
+
 	/**
 	 * The config parameter defining the timeout for filesystem stream opening.
 	 * A value of 0 indicates infinite waiting.
@@ -383,13 +384,13 @@ public final class ConfigConstants {
 
 	/**
 	 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
-	 * setting custom environment variables for the workers (TaskManagers)
+	 * setting custom environment variables for the workers (TaskManagers).
 	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_TASK_MANAGER_ENV_PREFIX} instead.
 	 */
 	@Deprecated
 	public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
 
-	
+
 	// ------------------------ YARN Configuration ------------------------
 
 	/**
@@ -425,7 +426,8 @@ public final class ConfigConstants {
 	 * The maximum number of failed YARN containers before entirely stopping
 	 * the YARN session / job on YARN.
 	 *
-	 * By default, we take the number of initially requested containers.
+	 * <p>By default, we take the number of initially requested containers.
+	 *
 	 * @deprecated in favor of {@code YarnConfigOptions#MAX_FAILED_CONTAINERS}.
 	 */
 	@Deprecated
@@ -435,7 +437,8 @@ public final class ConfigConstants {
 	 * Set the number of retries for failed YARN ApplicationMasters/JobManagers in high
 	 * availability mode. This value is usually limited by YARN.
 	 *
-	 * By default, it's 1 in the standalone case and 2 in the high availability case.
+	 * <p>By default, it's 1 in the standalone case and 2 in the high availability case.
+	 *
 	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_ATTEMPTS}.
 	 */
 	@Deprecated
@@ -443,7 +446,6 @@ public final class ConfigConstants {
 
 	/**
 	 * The heartbeat interval between the Application Master and the YARN Resource Manager.
-	 *
 	 * The default value is 5 (seconds).
 	 * @deprecated in favor of {@code YarnConfigOptions#HEARTBEAT_DELAY_SECONDS}.
 	 */
@@ -492,16 +494,17 @@ public final class ConfigConstants {
 	 */
 	public static final String YARN_CONTAINER_START_COMMAND_TEMPLATE =
 		"yarn.container-start-command-template";
-	
+
 	 /**
 	 * The config parameter defining the Akka actor system port for the ApplicationMaster and
 	 * JobManager
 	 *
-	 * The port can either be a port, such as "9123",
+	 * <p>The port can either be a port, such as "9123",
 	 * a range of ports: "50100-50200"
 	 * or a list of ranges and or points: "50100-50200,50300-50400,51234"
 	 *
-	 * Setting the port to 0 will let the OS choose an available port.
+	 * <p>Setting the port to 0 will let the OS choose an available port.
+	 *
 	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
 	 */
 	@Deprecated
@@ -528,7 +531,8 @@ public final class ConfigConstants {
 	 * The maximum number of failed Mesos tasks before entirely stopping
 	 * the Mesos session / job on Mesos.
 	 *
-	 * By default, we take the number of initially requested tasks.
+	 * <p>By default, we take the number of initially requested tasks.
+	 *
 	 * @deprecated in favor of {@code MesosOptions#MAX_FAILED_TASKS}.
 	 */
 	@Deprecated
@@ -537,7 +541,7 @@ public final class ConfigConstants {
 	/**
 	 * The Mesos master URL.
 	 *
-	 * The value should be in one of the following forms:
+	 * <p>The value should be in one of the following forms:
 	 * <pre>
 	 * {@code
 	 *     host:port
@@ -554,7 +558,8 @@ public final class ConfigConstants {
 	/**
 	 * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
 	 *
-	 * The default value is 600 (seconds).
+	 * <p>The default value is 600 (seconds).
+	 *
 	 * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
 	 */
 	@Deprecated
@@ -589,7 +594,7 @@ public final class ConfigConstants {
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
 
 	/**
-	 * Config parameter to override SSL support for the Artifact Server
+	 * Config parameter to override SSL support for the Artifact Server.
 	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
 	 */
 	@Deprecated
@@ -598,23 +603,23 @@ public final class ConfigConstants {
 	// ------------------------ Hadoop Configuration ------------------------
 
 	/**
-	 * Path to hdfs-default.xml file
+	 * Path to hdfs-default.xml file.
 	 *
 	 * @deprecated Use environment variable HADOOP_CONF_DIR instead.
 	 */
 	@Deprecated
 	public static final String HDFS_DEFAULT_CONFIG = "fs.hdfs.hdfsdefault";
-	
+
 	/**
-	 * Path to hdfs-site.xml file
+	 * Path to hdfs-site.xml file.
 	 *
 	 * @deprecated Use environment variable HADOOP_CONF_DIR instead.
 	 */
 	@Deprecated
 	public static final String HDFS_SITE_CONFIG = "fs.hdfs.hdfssite";
-	
+
 	/**
-	 * Path to Hadoop configuration
+	 * Path to Hadoop configuration.
 	 *
 	 * @deprecated Use environment variable HADOOP_CONF_DIR instead.
 	 */
@@ -661,8 +666,8 @@ public final class ConfigConstants {
 	 * The maximum length of a single sampled record before the sampling is aborted.
 	 */
 	public static final String DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY = "compiler.delimited-informat.max-sample-len";
-	
-	
+
+
 	// ------------------------- JobManager Web Frontend ----------------------
 
 	/**
@@ -674,7 +679,7 @@ public final class ConfigConstants {
 	public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
 
 	/**
-	 * Config parameter to override SSL support for the JobManager Web UI
+	 * Config parameter to override SSL support for the JobManager Web UI.
 	 *
 	 * @deprecated Use {@link WebOptions#SSL_ENABLED} instead.
 	 */
@@ -699,7 +704,7 @@ public final class ConfigConstants {
 	public static final String JOB_MANAGER_WEB_UPLOAD_DIR_KEY = "jobmanager.web.upload.dir";
 
 	/**
-	 * The config parameter defining the number of archived jobs for the jobmanager
+	 * The config parameter defining the number of archived jobs for the jobmanager.
 	 *
 	 * @deprecated Use {@link WebOptions#ARCHIVE_COUNT} instead.
 	 */
@@ -707,7 +712,7 @@ public final class ConfigConstants {
 	public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
 
 	/**
-	 * The log file location (may be in /log for standalone but under log directory when using YARN)
+	 * The log file location (may be in /log for standalone but under log directory when using YARN).
 	 *
 	 * @deprecated Use {@link WebOptions#LOG_PATH} instead.
 	 */
@@ -773,7 +778,7 @@ public final class ConfigConstants {
 	// ------------------------------ AKKA ------------------------------------
 
 	/**
-	 * Timeout for the startup of the actor system
+	 * Timeout for the startup of the actor system.
 	 *
 	 * @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead.
 	 */
@@ -781,7 +786,7 @@ public final class ConfigConstants {
 	public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout";
 
 	/**
-	 * Heartbeat interval of the transport failure detector
+	 * Heartbeat interval of the transport failure detector.
 	 *
 	 * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
 	 */
@@ -789,7 +794,7 @@ public final class ConfigConstants {
 	public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "akka.transport.heartbeat.interval";
 
 	/**
-	 * Allowed heartbeat pause for the transport failure detector
+	 * Allowed heartbeat pause for the transport failure detector.
 	 *
 	 * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
 	 */
@@ -797,7 +802,7 @@ public final class ConfigConstants {
 	public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = "akka.transport.heartbeat.pause";
 
 	/**
-	 * Detection threshold of transport failure detector
+	 * Detection threshold of transport failure detector.
 	 *
 	 * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
 	 */
@@ -805,7 +810,7 @@ public final class ConfigConstants {
 	public static final String AKKA_TRANSPORT_THRESHOLD = "akka.transport.threshold";
 
 	/**
-	 * Heartbeat interval of watch failure detector
+	 * Heartbeat interval of watch failure detector.
 	 *
 	 * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_INTERVAL} instead.
 	 */
@@ -813,7 +818,7 @@ public final class ConfigConstants {
 	public static final String AKKA_WATCH_HEARTBEAT_INTERVAL = "akka.watch.heartbeat.interval";
 
 	/**
-	 * Allowed heartbeat pause for the watch failure detector
+	 * Allowed heartbeat pause for the watch failure detector.
 	 *
 	 * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_PAUSE} instead.
 	 */
@@ -821,7 +826,7 @@ public final class ConfigConstants {
 	public static final String AKKA_WATCH_HEARTBEAT_PAUSE = "akka.watch.heartbeat.pause";
 
 	/**
-	 * Detection threshold for the phi accrual watch failure detector
+	 * Detection threshold for the phi accrual watch failure detector.
 	 *
 	 * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
 	 */
@@ -829,7 +834,7 @@ public final class ConfigConstants {
 	public static final String AKKA_WATCH_THRESHOLD = "akka.watch.threshold";
 
 	/**
-	 * Akka TCP timeout
+	 * Akka TCP timeout.
 	 *
 	 * @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead.
 	 */
@@ -837,7 +842,7 @@ public final class ConfigConstants {
 	public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout";
 
 	/**
-	 * Override SSL support for the Akka transport
+	 * Override SSL support for the Akka transport.
 	 *
 	 * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
 	 */
@@ -845,7 +850,7 @@ public final class ConfigConstants {
 	public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled";
 
 	/**
-	 * Maximum framesize of akka messages
+	 * Maximum framesize of akka messages.
 	 *
 	 * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
 	 */
@@ -853,7 +858,7 @@ public final class ConfigConstants {
 	public static final String AKKA_FRAMESIZE = "akka.framesize";
 
 	/**
-	 * Maximum number of messages until another actor is executed by the same thread
+	 * Maximum number of messages until another actor is executed by the same thread.
 	 *
 	 * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
 	 */
@@ -861,7 +866,7 @@ public final class ConfigConstants {
 	public static final String AKKA_DISPATCHER_THROUGHPUT = "akka.throughput";
 
 	/**
-	 * Log lifecycle events
+	 * Log lifecycle events.
 	 *
 	 * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
 	 */
@@ -869,7 +874,7 @@ public final class ConfigConstants {
 	public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events";
 
 	/**
-	 * Timeout for all blocking calls on the cluster side
+	 * Timeout for all blocking calls on the cluster side.
 	 *
 	 * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
 	 */
@@ -877,7 +882,7 @@ public final class ConfigConstants {
 	public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
 
 	/**
-	 * Timeout for all blocking calls that look up remote actors
+	 * Timeout for all blocking calls that look up remote actors.
 	 *
 	 * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
 	 */
@@ -885,7 +890,7 @@ public final class ConfigConstants {
 	public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
 
 	/**
-	 * Timeout for all blocking calls on the client side
+	 * Timeout for all blocking calls on the client side.
 	 *
 	 * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
 	 */
@@ -893,13 +898,13 @@ public final class ConfigConstants {
 	public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout";
 
 	/**
-	 * Exit JVM on fatal Akka errors
+	 * Exit JVM on fatal Akka errors.
 	 *
 	 * @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
 	 */
 	@Deprecated
 	public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error";
-	
+
 	// ----------------------------- Transport SSL Settings--------------------
 
 	/**
@@ -957,21 +962,21 @@ public final class ConfigConstants {
 	public static final String SECURITY_SSL_VERIFY_HOSTNAME = "security.ssl.verify-hostname";
 
 	// ----------------------------- Streaming --------------------------------
-	
+
 	/**
-	 * State backend for checkpoints
-	 * 
+	 * State backend for checkpoints.
+	 *
 	 * @deprecated Use {@link CheckpointingOptions#STATE_BACKEND} instead.
 	 */
 	@Deprecated
 	public static final String STATE_BACKEND = "state.backend";
-	
+
 	// ----------------------------- Miscellaneous ----------------------------
-	
+
 	/**
 	 * The key to the Flink base directory path. Was initially used for configurations of the
 	 * web UI, but outdated now.
-	 * 
+	 *
 	 * @deprecated This parameter should not be used any more. A running Flink cluster should
 	 *             make no assumption about its location.
 	 */
@@ -984,15 +989,15 @@ public final class ConfigConstants {
 
 	// --------------------------- High Availability --------------------------
 
-	/** Defines high availability mode used for the cluster execution ("NONE", "ZOOKEEPER") */
+	/** Defines high availability mode used for the cluster execution ("NONE", "ZOOKEEPER"). */
 	@PublicEvolving
 	public static final String HA_MODE = "high-availability";
 
-	/** Ports used by the job manager if not in 'none' recovery mode */
+	/** Ports used by the job manager if not in 'none' recovery mode. */
 	@PublicEvolving
 	public static final String HA_JOB_MANAGER_PORT = "high-availability.jobmanager.port";
 
-	/** The time before the JobManager recovers persisted jobs */
+	/** The time before the JobManager recovers persisted jobs. */
 	@PublicEvolving
 	public static final String HA_JOB_DELAY = "high-availability.job.delay";
 
@@ -1187,7 +1192,7 @@ public final class ConfigConstants {
 
 	/** The class of the reporter to use. This is used as a suffix in an actual reporter config */
 	public static final String METRICS_REPORTER_CLASS_SUFFIX = "class";
-	
+
 	/** The interval between reports. This is used as a suffix in an actual reporter config */
 	public static final String METRICS_REPORTER_INTERVAL_SUFFIX = "interval";
 
@@ -1258,12 +1263,12 @@ public final class ConfigConstants {
 	// ------------------------------------------------------------------------
 
 	// ---------------------------- Parallelism -------------------------------
-	
+
 	/**
 	 * The default parallelism for operations.
 	 */
 	public static final int DEFAULT_PARALLELISM = 1;
-	
+
 	/**
 	 * The default number of execution retries.
 	 */
@@ -1272,14 +1277,14 @@ public final class ConfigConstants {
 	// ------------------------------ Runtime ---------------------------------
 
 	/**
-	 * The default library cache manager cleanup interval in seconds
+	 * The default library cache manager cleanup interval in seconds.
 	 *
 	 * @deprecated use {@link BlobServerOptions#CLEANUP_INTERVAL} instead
 	 */
 	@Deprecated
 	public static final long DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL =
 		BlobServerOptions.CLEANUP_INTERVAL.defaultValue();
-	
+
 	/**
 	 * The default network port to connect to for communication with the job manager.
 	 */
@@ -1335,7 +1340,7 @@ public final class ConfigConstants {
 	public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 0;
 
 	/**
-	 * The default value to override ssl support for task manager's data transport
+	 * The default value to override ssl support for task manager's data transport.
 	 */
 	public static final boolean DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED = true;
 
@@ -1388,7 +1393,7 @@ public final class ConfigConstants {
 	public static final long DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = 5000L;
 
 	/**
-	 * The default task manager's maximum registration duration
+	 * The default task manager's maximum registration duration.
 	 */
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
 
@@ -1420,22 +1425,22 @@ public final class ConfigConstants {
 	public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS = 30000;
 
 	// ------------------------ Runtime Algorithms ------------------------
-	
+
 	/**
 	 * Default setting for the switch for hash join bloom filters for spilled partitions.
 	 */
 	public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = false;
-	
+
 	/**
 	 * The default value for the maximum spilling fan in/out.
 	 */
 	public static final int DEFAULT_SPILLING_MAX_FAN = 128;
-	
+
 	/**
 	 * The default percentage of the sort memory to be full before data is spilled.
 	 */
 	public static final float DEFAULT_SORT_SPILLING_THRESHOLD = 0.8f;
-	
+
 	/**
 	 * The default timeout for filesystem stream opening: infinite (means max long milliseconds).
 	 */
@@ -1466,7 +1471,7 @@ public final class ConfigConstants {
 	public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
 
 	/**
-	 * Start command template for Flink on YARN containers
+	 * Start command template for Flink on YARN containers.
 	 */
 	public static final String DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE =
 		"%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%";
@@ -1483,7 +1488,7 @@ public final class ConfigConstants {
 	// For more configuration entries please see {@code MesosTaskManagerParameters}.
 
 	/**
-	 * The default failover timeout provided to Mesos (10 mins)
+	 * The default failover timeout provided to Mesos (10 mins).
 	 * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
 	 */
 	@Deprecated
@@ -1522,12 +1527,12 @@ public final class ConfigConstants {
 
 	/**
 	 * The default filesystem to be used, if no other scheme is specified in the
-	 * user-provided URI (= local filesystem)
-	 * */
+	 * user-provided URI (= local filesystem).
+	 */
 	public static final String DEFAULT_FILESYSTEM_SCHEME = "file:///";
-	
+
 	/**
-	 * The default behavior with respect to overwriting existing files (= not overwrite)
+	 * The default behavior with respect to overwriting existing files (= not overwrite).
 	 */
 	public static final boolean DEFAULT_FILESYSTEM_OVERWRITE = false;
 
@@ -1535,26 +1540,26 @@ public final class ConfigConstants {
 	 * The default behavior for output directory creating (create only directory when parallelism &gt; 1).
 	 */
 	public static final boolean DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY = false;
-	
-	
+
+
 	// ---------------------------- Compiler -------------------------------
 
 	/**
 	 * The default maximum number of line samples taken by the delimited input format.
 	 */
 	public static final int DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES = 10;
-	
+
 	/**
 	 * The default minimum number of line samples taken by the delimited input format.
 	 */
 	public static final int DEFAULT_DELIMITED_FORMAT_MIN_LINE_SAMPLES = 2;
-	
+
 	/**
 	 * The default maximum sample length before sampling is aborted (2 MiBytes).
 	 */
 	public static final int DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN = 2 * 1024 * 1024;
-	
-	
+
+
 	// ------------------------- JobManager Web Frontend ----------------------
 
 	/**
@@ -1577,7 +1582,7 @@ public final class ConfigConstants {
 	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
 
 	/**
-	 * Default value to override SSL support for the JobManager web UI
+	 * Default value to override SSL support for the JobManager web UI.
 	 *
 	 * @deprecated use {@link WebOptions#SSL_ENABLED} instead
 	 */
@@ -1585,7 +1590,7 @@ public final class ConfigConstants {
 	public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true;
 
 	/**
-	 * The default number of archived jobs for the jobmanager
+	 * The default number of archived jobs for the jobmanager.
 	 *
 	 * @deprecated use {@link WebOptions#ARCHIVE_COUNT} instead
 	 */
@@ -1650,67 +1655,67 @@ public final class ConfigConstants {
 	 * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
 	 */
 	@Deprecated
-	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
+	public static final String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
 	 */
 	@Deprecated
-	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
+	public static final String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
 	 */
 	@Deprecated
-	public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
+	public static final double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
 	 */
 	@Deprecated
-	public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
+	public static final double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
 	 */
 	@Deprecated
-	public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;
+	public static final int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
 	 */
 	@Deprecated
-	public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;
+	public static final boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
 	 */
 	@Deprecated
-	public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
+	public static final String DEFAULT_AKKA_FRAMESIZE = "10485760b";
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
 	 */
 	@Deprecated
-	public static String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";
+	public static final String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
 	 */
 	@Deprecated
-	public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
+	public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
 	 */
 	@Deprecated
-	public static String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
+	public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
 
 	/**
 	 * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
 	 */
 	@Deprecated
-	public static boolean DEFAULT_AKKA_SSL_ENABLED = true;
+	public static final boolean DEFAULT_AKKA_SSL_ENABLED = true;
 
 	// ----------------------------- SSL Values --------------------------------
 
@@ -1718,34 +1723,34 @@ public final class ConfigConstants {
 	 * @deprecated use {@link SecurityOptions#SSL_ENABLED} instead
 	 */
 	@Deprecated
-	public static boolean DEFAULT_SECURITY_SSL_ENABLED = false;
+	public static final boolean DEFAULT_SECURITY_SSL_ENABLED = false;
 
 	/**
 	 * @deprecated use {@link SecurityOptions#SSL_PROTOCOL} instead
 	 */
 	@Deprecated
-	public static String DEFAULT_SECURITY_SSL_PROTOCOL = "TLSv1.2";
+	public static final String DEFAULT_SECURITY_SSL_PROTOCOL = "TLSv1.2";
 
 	/**
 	 * @deprecated use {@link SecurityOptions#SSL_ALGORITHMS} instead
 	 */
 	@Deprecated
-	public static String DEFAULT_SECURITY_SSL_ALGORITHMS = "TLS_RSA_WITH_AES_128_CBC_SHA";
+	public static final String DEFAULT_SECURITY_SSL_ALGORITHMS = "TLS_RSA_WITH_AES_128_CBC_SHA";
 
 	/**
 	 * @deprecated use {@link SecurityOptions#SSL_VERIFY_HOSTNAME} instead
 	 */
 	@Deprecated
-	public static boolean DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME = true;
+	public static final boolean DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME = true;
 
 	// ----------------------------- Streaming Values --------------------------
-	
-	public static String DEFAULT_STATE_BACKEND = "jobmanager";
+
+	public static final String DEFAULT_STATE_BACKEND = "jobmanager";
 
 	// ----------------------------- LocalExecution ----------------------------
 
 	/**
-	 * Sets the number of local task managers
+	 * Sets the number of local task managers.
 	 */
 	public static final String LOCAL_NUMBER_TASK_MANAGER = "local.number-taskmanager";
 
@@ -1772,11 +1777,11 @@ public final class ConfigConstants {
 	// --------------------------- High Availability ---------------------------------
 
 	@PublicEvolving
-	public static String DEFAULT_HA_MODE = "none";
+	public static final String DEFAULT_HA_MODE = "none";
 
 	/** @deprecated Deprecated in favour of {@link #DEFAULT_HA_MODE} */
 	@Deprecated
-	public static String DEFAULT_RECOVERY_MODE = "standalone";
+	public static final String DEFAULT_RECOVERY_MODE = "standalone";
 
 	/**
 	 * Default port used by the job manager if not in standalone recovery mode. If <code>0</code>
@@ -1898,16 +1903,16 @@ public final class ConfigConstants {
 
 	// ----------------------------- Environment Variables ----------------------------
 
-	/** The environment variable name which contains the location of the configuration directory */
+	/** The environment variable name which contains the location of the configuration directory. */
 	public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
 
-	/** The environment variable name which contains the location of the lib folder */
+	/** The environment variable name which contains the location of the lib folder. */
 	public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
-	/** The environment variable name which contains the location of the bin directory */
+	/** The environment variable name which contains the location of the bin directory. */
 	public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";
 
-	/** The environment variable name which contains the Flink installation root directory */
+	/** The environment variable name which contains the Flink installation root directory. */
 	public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
 
 	// ---------------------------- Encoding ------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
index a0ad000..3cd1d7f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroup.java
@@ -15,15 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.configuration;
 
-import java.lang.annotation.Target;
 import org.apache.flink.annotation.Internal;
 
+import java.lang.annotation.Target;
+
 /**
  * A class that specifies a group of {@link ConfigOption}. The name of the group will be used as the basis for the
  * filename of the generated html file, as defined in {@link ConfigOptionsDocGenerator}.
- * 
+ *
  * @see ConfigGroups
  */
 @Target({})

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
index 2c1f871..94e52ee 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigGroups.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.Internal;
+
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import org.apache.flink.annotation.Internal;
 
 /**
  * Annotation used on classes containing {@link ConfigOption}s that enables the separation of options into different

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
index 22d2cc4..8e7d79b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
@@ -29,10 +29,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A {@code ConfigOption} describes a configuration parameter. It encapsulates
  * the configuration key, deprecated older versions of the key, and an optional
  * default value for the configuration parameter.
- * 
+ *
  * <p>{@code ConfigOptions} are built via the {@link ConfigOptions} class.
  * Once created, a config option is immutable.
- * 
+ *
  * @param <T> The type of value associated with the configuration option.
  */
 @PublicEvolving
@@ -42,16 +42,16 @@ public class ConfigOption<T> {
 
 	// ------------------------------------------------------------------------
 
-	/** The current key for that config option */
+	/** The current key for that config option. */
 	private final String key;
 
-	/** The list of deprecated keys, in the order to be checked */
+	/** The list of deprecated keys, in the order to be checked. */
 	private final String[] deprecatedKeys;
 
-	/** The default value for this option */
+	/** The default value for this option. */
 	private final T defaultValue;
 
-	/** The description for this option */
+	/** The description for this option. */
 	private final String description;
 
 	// ------------------------------------------------------------------------
@@ -88,11 +88,11 @@ public class ConfigOption<T> {
 	/**
 	 * Creates a new config option, using this option's key and default value, and
 	 * adding the given deprecated keys.
-	 * 
+	 *
 	 * <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
 	 * the deprecated keys will be checked in the order provided to this method. The first key for which
 	 * a value is found will be used - that value will be returned.
-	 * 
+	 *
 	 * @param deprecatedKeys The deprecated keys, in the order in which they should be checked.
 	 * @return A new config options, with the given deprecated keys.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
index f87da0a..1ec5b3c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
@@ -25,23 +25,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * {@code ConfigOptions} are used to build a {@link ConfigOption}.
  * The option is typically built in one of the following pattern:
- * 
+ *
  * <pre>{@code
  * // simple string-valued option with a default value
  * ConfigOption<String> tempDirs = ConfigOptions
  *     .key("tmp.dir")
  *     .defaultValue("/tmp");
- * 
+ *
  * // simple integer-valued option with a default value
  * ConfigOption<Integer> parallelism = ConfigOptions
  *     .key("application.parallelism")
  *     .defaultValue(100);
- * 
+ *
  * // option with no default value
  * ConfigOption<String> userName = ConfigOptions
  *     .key("user.name")
  *     .noDefaultValue();
- * 
+ *
  * // option with deprecated keys to check
  * ConfigOption<Double> threshold = ConfigOptions
  *     .key("cpu.utilization.threshold")
@@ -54,7 +54,7 @@ public class ConfigOptions {
 
 	/**
 	 * Starts building a new {@link ConfigOption}.
-	 * 
+	 *
 	 * @param key The key for the config option.
 	 * @return The builder for the config option with the given key.
 	 */
@@ -71,7 +71,7 @@ public class ConfigOptions {
 	 */
 	public static final class OptionBuilder {
 
-		/** The key for the config option */
+		/** The key for the config option. */
 		private final String key;
 
 		/**
@@ -84,24 +84,24 @@ public class ConfigOptions {
 
 		/**
 		 * Creates a ConfigOption with the given default value.
-		 * 
+		 *
 		 * <p>This method does not accept "null". For options with no default value, choose
 		 * one of the {@code noDefaultValue} methods.
-		 * 
+		 *
 		 * @param value The default value for the config option
 		 * @param <T> The type of the default value.
 		 * @return The config option with the default value.
 		 */
 		public <T> ConfigOption<T> defaultValue(T value) {
 			checkNotNull(value);
-			return new ConfigOption<T>(key, value);
+			return new ConfigOption<>(key, value);
 		}
 
 		/**
 		 * Creates a string-valued option with no default value.
 		 * String-valued options are the only ones that can have no
 		 * default value.
-		 * 
+		 *
 		 * @return The created ConfigOption.
 		 */
 		public ConfigOption<String> noDefaultValue() {
@@ -111,6 +111,6 @@ public class ConfigOptions {
 
 	// ------------------------------------------------------------------------
 
-	/** Not intended to be instantiated */
+	/** Not intended to be instantiated. */
 	private ConfigOptions() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
index 1769c35..02ce7bb 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index dfcd04f..7d99fbb 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.configuration;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -33,14 +25,23 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
 /**
  * Lightweight configuration object which stores key/value pairs.
  */
 @Public
-public class Configuration extends ExecutionConfig.GlobalJobParameters 
+public class Configuration extends ExecutionConfig.GlobalJobParameters
 		implements IOReadableWritable, java.io.Serializable, Cloneable {
 
 	private static final long serialVersionUID = 1L;
@@ -66,29 +67,29 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	 * Creates a new empty configuration.
 	 */
 	public Configuration() {
-		this.confData = new HashMap<String, Object>();
+		this.confData = new HashMap<>();
 	}
 
 	/**
 	 * Creates a new configuration with the copy of the given configuration.
-	 * 
+	 *
 	 * @param other The configuration to copy the entries from.
 	 */
 	public Configuration(Configuration other) {
-		this.confData = new HashMap<String, Object>(other.confData);
+		this.confData = new HashMap<>(other.confData);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * Returns the class associated with the given key as a string.
-	 * 
+	 *
 	 * @param <T> The type of the class to return.
 
 	 * @param key The key pointing to the associated value
 	 * @param defaultValue The optional default value returned if no entry exists
 	 * @param classLoader The class loader used to resolve the class.
-	 * 
+	 *
 	 * @return The value associated with the given key, or the default value, if to entry for the key exists.
 	 */
 	@SuppressWarnings("unchecked")
@@ -97,11 +98,11 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		if (o == null) {
 			return (Class<T>) defaultValue;
 		}
-		
+
 		if (o.getClass() == String.class) {
 			return (Class<T>) Class.forName((String) o, true, classLoader);
 		}
-		
+
 		LOG.warn("Configuration cannot evaluate value " + o + " as a class name");
 		return (Class<T>) defaultValue;
 	}
@@ -109,7 +110,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	/**
 	 * Adds the given key/value pair to the configuration object. The class can be retrieved by invoking
 	 * {@link #getClass(String, Class, ClassLoader)} if it is in the scope of the class loader on the caller.
-	 * 
+	 *
 	 * @param key The key of the pair to be added
 	 * @param klazz The value of the pair to be added
 	 * @see #getClass(String, Class, ClassLoader)
@@ -120,7 +121,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Returns the value associated with the given key as a string.
-	 * 
+	 *
 	 * @param key
 	 *        the key pointing to the associated value
 	 * @param defaultValue
@@ -164,7 +165,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Adds the given key/value pair to the configuration object.
-	 * 
+	 *
 	 * @param key
 	 *        the key of the key/value pair to be added
 	 * @param value
@@ -190,7 +191,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Returns the value associated with the given key as an integer.
-	 * 
+	 *
 	 * @param key
 	 *        the key pointing to the associated value
 	 * @param defaultValue
@@ -238,7 +239,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Adds the given key/value pair to the configuration object.
-	 * 
+	 *
 	 * @param key
 	 *        the key of the key/value pair to be added
 	 * @param value
@@ -264,7 +265,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Returns the value associated with the given key as a long.
-	 * 
+	 *
 	 * @param key
 	 *        the key pointing to the associated value
 	 * @param defaultValue
@@ -312,7 +313,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Adds the given key/value pair to the configuration object.
-	 * 
+	 *
 	 * @param key
 	 *        the key of the key/value pair to be added
 	 * @param value
@@ -338,7 +339,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Returns the value associated with the given key as a boolean.
-	 * 
+	 *
 	 * @param key
 	 *        the key pointing to the associated value
 	 * @param defaultValue
@@ -386,7 +387,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Adds the given key/value pair to the configuration object.
-	 * 
+	 *
 	 * @param key
 	 *        the key of the key/value pair to be added
 	 * @param value
@@ -412,7 +413,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Returns the value associated with the given key as a float.
-	 * 
+	 *
 	 * @param key
 	 *        the key pointing to the associated value
 	 * @param defaultValue
@@ -460,7 +461,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Adds the given key/value pair to the configuration object.
-	 * 
+	 *
 	 * @param key
 	 *        the key of the key/value pair to be added
 	 * @param value
@@ -486,7 +487,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Returns the value associated with the given key as a double.
-	 * 
+	 *
 	 * @param key
 	 *        the key pointing to the associated value
 	 * @param defaultValue
@@ -534,7 +535,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Adds the given key/value pair to the configuration object.
-	 * 
+	 *
 	 * @param key
 	 *        the key of the key/value pair to be added
 	 * @param value
@@ -560,7 +561,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 	/**
 	 * Returns the value associated with the given key as a byte array.
-	 * 
+	 *
 	 * @param key
 	 *        The key pointing to the associated value.
 	 * @param defaultValue
@@ -569,7 +570,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	 */
 	@SuppressWarnings("EqualsBetweenInconvertibleTypes")
 	public byte[] getBytes(String key, byte[] defaultValue) {
-		
+
 		Object o = getRawValue(key);
 		if (o == null) {
 			return defaultValue;
@@ -582,10 +583,10 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			return defaultValue;
 		}
 	}
-	
+
 	/**
 	 * Adds the given byte array to the configuration object. If key is <code>null</code> then nothing is added.
-	 * 
+	 *
 	 * @param key
 	 *        The key under which the bytes are added.
 	 * @param bytes
@@ -608,16 +609,16 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns the keys of all key/value pairs stored inside this
 	 * configuration object.
-	 * 
+	 *
 	 * @return the keys of all key/value pairs stored inside this configuration object
 	 */
 	public Set<String> keySet() {
 		synchronized (this.confData) {
-			return new HashSet<String>(this.confData.keySet());
+			return new HashSet<>(this.confData.keySet());
 		}
 	}
 
@@ -639,11 +640,11 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			}
 		}
 	}
-	
+
 	/**
 	 * Adds all entries from the given configuration into this configuration. The keys
 	 * are prepended with the given prefix.
-	 * 
+	 *
 	 * @param other
 	 *        The configuration whose entries are added to this configuration.
 	 * @param prefix
@@ -674,7 +675,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
-	 * Checks whether there is an entry with the specified key
+	 * Checks whether there is an entry with the specified key.
 	 *
 	 * @param key key of entry
 	 * @return true if the key is stored, false otherwise
@@ -686,7 +687,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
-	 * Checks whether there is an entry for the given config option
+	 * Checks whether there is an entry for the given config option.
 	 *
 	 * @param configOption The configuration option
 	 *
@@ -720,8 +721,8 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	@Override
 	public Map<String, String> toMap() {
 		synchronized (this.confData){
-			Map<String, String> ret = new HashMap<String, String>(this.confData.size());
-			for(Map.Entry<String, Object> entry : confData.entrySet()) {
+			Map<String, String> ret = new HashMap<>(this.confData.size());
+			for (Map.Entry<String, Object> entry : confData.entrySet()) {
 				ret.put(entry.getKey(), entry.getValue().toString());
 			}
 			return ret;
@@ -730,7 +731,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	<T> void setValueInternal(String key, T value) {
 		if (key == null) {
 			throw new NullPointerException("Key must not be null.");
@@ -738,17 +739,17 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		if (value == null) {
 			throw new NullPointerException("Value must not be null.");
 		}
-		
+
 		synchronized (this.confData) {
 			this.confData.put(key, value);
 		}
 	}
-	
+
 	private Object getRawValue(String key) {
 		if (key == null) {
 			throw new NullPointerException("Key must not be null.");
 		}
-		
+
 		synchronized (this.confData) {
 			return this.confData.get(key);
 		}
@@ -767,7 +768,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			for (String deprecatedKey : configOption.deprecatedKeys()) {
 				Object oo = getRawValue(deprecatedKey);
 				if (oo != null) {
-					LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", 
+					LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
 							deprecatedKey, configOption.key());
 					return oo;
 				}
@@ -893,7 +894,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			for (int i = 0; i < numberOfProperties; i++) {
 				String key = StringValue.readString(in);
 				Object value;
-				
+
 				byte type = in.readByte();
 				switch (type) {
 					case TYPE_STRING:
@@ -922,7 +923,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 					default:
 						throw new IOException("Unrecognized type: " + type);
 				}
-				
+
 				this.confData.put(key, value);
 			}
 		}
@@ -932,14 +933,14 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	public void write(final DataOutputView out) throws IOException {
 		synchronized (this.confData) {
 			out.writeInt(this.confData.size());
-			
+
 			for (Map.Entry<String, Object> entry : this.confData.entrySet()) {
 				String key = entry.getKey();
 				Object val = entry.getValue();
-						
+
 				StringValue.writeString(key, out);
 				Class<?> clazz = val.getClass();
-				
+
 				if (clazz == String.class) {
 					out.write(TYPE_STRING);
 					StringValue.writeString((String) val, out);
@@ -976,7 +977,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
@@ -996,11 +997,11 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		}
 		else if (obj instanceof Configuration) {
 			Map<String, Object> otherConf = ((Configuration) obj).confData;
-			
+
 			for (Map.Entry<String, Object> e : this.confData.entrySet()) {
 				Object thisVal = e.getValue();
 				Object otherVal = otherConf.get(e.getKey());
-				
+
 				if (!thisVal.getClass().equals(byte[].class)) {
 					if (!thisVal.equals(otherVal)) {
 						return false;
@@ -1013,14 +1014,14 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 					return false;
 				}
 			}
-			
+
 			return true;
 		}
 		else {
 			return false;
 		}
 	}
-	
+
 	@Override
 	public String toString() {
 		return this.confData.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index f31ad8c..1182ed5 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -81,7 +81,6 @@ public class CoreOptions {
 		.key("classloader.parent-first-patterns")
 		.defaultValue("java.;scala.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
 
-
 	// ------------------------------------------------------------------------
 	//  process parameters
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 8cac66c..7b75c7a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -111,7 +111,7 @@ public final class DelegatingConfiguration extends Configuration {
 	public int getInteger(ConfigOption<Integer> configOption) {
 		return  this.backingConfig.getInteger(prefixOption(configOption, prefix));
 	}
-	
+
 	@Override
 	public int getInteger(ConfigOption<Integer> configOption, int overrideDefault) {
 		return this.backingConfig.getInteger(configOption, overrideDefault);
@@ -282,7 +282,7 @@ public final class DelegatingConfiguration extends Configuration {
 			return this.backingConfig.keySet();
 		}
 
-		final HashSet<String> set = new HashSet<String>();
+		final HashSet<String> set = new HashSet<>();
 		int prefixLen = this.prefix.length();
 
 		for (String key : this.backingConfig.keySet()) {
@@ -307,7 +307,7 @@ public final class DelegatingConfiguration extends Configuration {
 			prefixed.put(prefix + entry.getKey(), entry.getValue());
 		}
 
-		return prefixed; 
+		return prefixed;
 	}
 
 	@Override
@@ -367,7 +367,7 @@ public final class DelegatingConfiguration extends Configuration {
 		}
 
 		String[] deprecated = deprecatedKeys.toArray(new String[deprecatedKeys.size()]);
-		return new ConfigOption<T>(key,
+		return new ConfigOption<>(key,
 			option.description(),
 			option.defaultValue(),
 			deprecated);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 4569ebe..2f2a9cf 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -18,18 +18,19 @@
 
 package org.apache.flink.configuration;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
 import org.apache.flink.annotation.Internal;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
 /**
  * Global configuration object for Flink. Similar to Java properties configuration
  * objects it includes key-value pairs which represent the framework's configuration.
@@ -65,8 +66,8 @@ public final class GlobalConfiguration {
 
 	/**
 	 * Loads the configuration files from the specified directory.
-	 * <p>
-	 * YAML files are supported as configuration files.
+	 *
+	 * <p>YAML files are supported as configuration files.
 	 *
 	 * @param configDir
 	 *        the directory which contains the configuration files
@@ -132,21 +133,21 @@ public final class GlobalConfiguration {
 
 	/**
 	 * Loads a YAML-file of key-value pairs.
-	 * <p>
-	 * Colon and whitespace ": " separate key and value (one per line). The hash tag "#" starts a single-line comment.
-	 * <p>
-	 * Example:
-	 * 
+	 *
+	 * <p>Colon and whitespace ": " separate key and value (one per line). The hash tag "#" starts a single-line comment.
+	 *
+	 * <p>Example:
+	 *
 	 * <pre>
 	 * jobmanager.rpc.address: localhost # network address for communication with the job manager
 	 * jobmanager.rpc.port   : 6123      # network port to connect to for communication with the job manager
 	 * taskmanager.rpc.port  : 6122      # network port the task manager expects incoming IPC connections
 	 * </pre>
-	 * <p>
-	 * This does not span the whole YAML specification, but only the *syntax* of simple YAML key-value pairs (see issue
+	 *
+	 * <p>This does not span the whole YAML specification, but only the *syntax* of simple YAML key-value pairs (see issue
 	 * #113 on GitHub). If at any point in time, there is a need to go beyond simple key-value pairs syntax
 	 * compatibility will allow to introduce a YAML parser library.
-	 * 
+	 *
 	 * @param file the YAML file to read from
 	 * @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a>
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
index 81cbc5d..e2f6ff6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
@@ -28,18 +28,18 @@ import static org.apache.flink.configuration.ConfigOptions.key;
 @PublicEvolving
 public class HeartbeatManagerOptions {
 
-	/** Time interval for requesting heartbeat from sender side */
+	/** Time interval for requesting heartbeat from sender side. */
 	public static final ConfigOption<Long> HEARTBEAT_INTERVAL =
 			key("heartbeat.interval")
 			.defaultValue(10000L);
 
-	/** Timeout for requesting and receiving heartbeat for both sender and receiver sides */
+	/** Timeout for requesting and receiving heartbeat for both sender and receiver sides. */
 	public static final ConfigOption<Long> HEARTBEAT_TIMEOUT =
 			key("heartbeat.timeout")
 			.defaultValue(50000L);
 
 	// ------------------------------------------------------------------------
 
-	/** Not intended to be instantiated */
+	/** Not intended to be instantiated. */
 	private HeartbeatManagerOptions() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 6ee9f94..0039fdd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -32,33 +32,33 @@ public class HighAvailabilityOptions {
 	//  Required High Availability Options
 	// ------------------------------------------------------------------------
 
-	/** 
+	/**
 	 * Defines high-availability mode used for the cluster execution.
 	 * A value of "NONE" signals no highly available setup.
 	 * To enable high-availability, set this mode to "ZOOKEEPER".
 	 */
-	public static final ConfigOption<String> HA_MODE = 
+	public static final ConfigOption<String> HA_MODE =
 			key("high-availability")
 			.defaultValue("NONE")
 			.withDeprecatedKeys("recovery.mode");
 
 	/**
-	 * The ID of the Flink cluster, used to separate multiple Flink clusters 
+	 * The ID of the Flink cluster, used to separate multiple Flink clusters
 	 * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
 	 */
-	public static final ConfigOption<String> HA_CLUSTER_ID = 
+	public static final ConfigOption<String> HA_CLUSTER_ID =
 			key("high-availability.cluster-id")
 			.defaultValue("/default")
 			.withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace");
 
 	/**
-	 * File system path (URI) where Flink persists metadata in high-availability setups
+	 * File system path (URI) where Flink persists metadata in high-availability setups.
 	 */
 	public static final ConfigOption<String> HA_STORAGE_PATH =
 			key("high-availability.storageDir")
 			.noDefaultValue()
 			.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir");
-	
+
 
 	// ------------------------------------------------------------------------
 	//  Recovery Options
@@ -67,7 +67,7 @@ public class HighAvailabilityOptions {
 	/**
 	 * Optional port (range) used by the job manager in high-availability mode.
 	 */
-	public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE = 
+	public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE =
 			key("high-availability.jobmanager.port")
 			.defaultValue("0")
 			.withDeprecatedKeys("recovery.jobmanager.port");
@@ -75,7 +75,7 @@ public class HighAvailabilityOptions {
 	/**
 	 * The time before a JobManager after a fail over recovers the current jobs.
 	 */
-	public static final ConfigOption<String> HA_JOB_DELAY = 
+	public static final ConfigOption<String> HA_JOB_DELAY =
 			key("high-availability.job.delay")
 			.noDefaultValue()
 			.withDeprecatedKeys("recovery.job.delay");
@@ -93,7 +93,7 @@ public class HighAvailabilityOptions {
 			.withDeprecatedKeys("recovery.zookeeper.quorum");
 
 	/**
-	 * The root path under which Flink stores its entries in ZooKeeper
+	 * The root path under which Flink stores its entries in ZooKeeper.
 	 */
 	public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
 			key("high-availability.zookeeper.path.root")
@@ -139,7 +139,7 @@ public class HighAvailabilityOptions {
 	//  ZooKeeper Client Settings
 	// ------------------------------------------------------------------------
 
-	public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT = 
+	public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT =
 			key("high-availability.zookeeper.client.session-timeout")
 			.defaultValue(60000)
 			.withDeprecatedKeys("recovery.zookeeper.client.session-timeout");
@@ -149,17 +149,17 @@ public class HighAvailabilityOptions {
 			.defaultValue(15000)
 			.withDeprecatedKeys("recovery.zookeeper.client.connection-timeout");
 
-	public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT = 
+	public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT =
 			key("high-availability.zookeeper.client.retry-wait")
 			.defaultValue(5000)
 			.withDeprecatedKeys("recovery.zookeeper.client.retry-wait");
 
-	public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS = 
+	public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
 			key("high-availability.zookeeper.client.max-retry-attempts")
 			.defaultValue(3)
 			.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
 
-	public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH = 
+	public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =
 			key("high-availability.zookeeper.path.running-registry")
 			.defaultValue("/running_job_registry/");
 
@@ -169,6 +169,6 @@ public class HighAvailabilityOptions {
 
 	// ------------------------------------------------------------------------
 
-	/** Not intended to be instantiated */
+	/** Not intended to be instantiated. */
 	private HighAvailabilityOptions() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index 27c56d4..c7c6933 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
index fb1e5a8..6f85c16 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
@@ -33,7 +33,7 @@ public class IllegalConfigurationException extends RuntimeException {
 
 	/**
 	 * Constructs an new IllegalConfigurationException with the given error message.
-	 * 
+	 *
 	 * @param message The error message for the exception.
 	 */
 	public IllegalConfigurationException(String message) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index ef3306e..1e22a24 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.configuration;
 
-import static org.apache.flink.configuration.ConfigOptions.key;
-
 import org.apache.flink.annotation.PublicEvolving;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /**
  * Configuration options for the JobManager.
  */
@@ -31,8 +31,8 @@ public class JobManagerOptions {
 	/**
 	 * The config parameter defining the network address to connect to
 	 * for communication with the job manager.
-	 * 
-	 * <p>This value is only interpreted in setups where a single JobManager with static 
+	 *
+	 * <p>This value is only interpreted in setups where a single JobManager with static
 	 * name or address exists (simple standalone setups, or container setups with dynamic
 	 * service name resolution). It is not used in many high-availability setups, when a
 	 * leader-election service (like ZooKeeper) is used to elect and discover the JobManager
@@ -45,7 +45,7 @@ public class JobManagerOptions {
 	/**
 	 * The config parameter defining the network port to connect to
 	 * for communication with the job manager.
-	 * 
+	 *
 	 * <p>Like {@link JobManagerOptions#ADDRESS}, this value is only interpreted in setups where
 	 * a single JobManager with static name/address and port exists (simple standalone setups,
 	 * or container setups with dynamic service name resolution).
@@ -58,7 +58,7 @@ public class JobManagerOptions {
 		.defaultValue(6123);
 
 	/**
-	 * JVM heap size (in megabytes) for the JobManager
+	 * JVM heap size (in megabytes) for the JobManager.
 	 */
 	public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
 		key("jobmanager.heap.mb")
@@ -83,7 +83,7 @@ public class JobManagerOptions {
 	 * This option specifies the interval in order to trigger a resource manager reconnection if the connection
 	 * to the resource manager has been lost.
 	 *
-	 * This option is only intended for internal use.
+	 * <p>This option is only intended for internal use.
 	 */
 	public static final ConfigOption<Long> RESOURCE_MANAGER_RECONNECT_INTERVAL =
 		key("jobmanager.resourcemanager.reconnect-interval")

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java b/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
index ff38837..fba2a68 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
@@ -27,14 +27,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * MemorySize is a representation of a number of bytes, viewable in different units.
- * 
+ *
  * <h2>Parsing</h2>
- * 
- * The size can be parsed from a text expression. If the expression is a pure number,
+ *
+ * <p>The size can be parsed from a text expression. If the expression is a pure number,
  * the value will be interpreted as bytes.
- * 
+ *
  * <p>To make larger values more compact, the common size suffixes are supported:
- * 
+ *
  * <ul>
  *     <li>q or 1b or 1bytes (bytes)
  *     <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
@@ -63,12 +63,12 @@ public class MemorySize implements java.io.Serializable {
 
 	// ------------------------------------------------------------------------
 
-	/** The memory size, in bytes */
+	/** The memory size, in bytes. */
 	private final long bytes;
 
 	/**
 	 * Constructs a new MemorySize.
-	 * 
+	 *
 	 * @param bytes The size, in bytes. Must be zero or larger.
 	 */
 	public MemorySize(long bytes) {
@@ -122,7 +122,7 @@ public class MemorySize implements java.io.Serializable {
 
 	@Override
 	public boolean equals(Object obj) {
-		return obj == this || 
+		return obj == this ||
 				(obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);
 	}
 
@@ -138,10 +138,10 @@ public class MemorySize implements java.io.Serializable {
 	/**
 	 * Parses the given string as as MemorySize.
 	 * The supported expressions are listed under {@link MemorySize}.
-	 * 
+	 *
 	 * @param text The string to parse
 	 * @return The parsed MemorySize
-	 * 
+	 *
 	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
 	 */
 	public static MemorySize parse(String text) throws IllegalArgumentException {
@@ -151,10 +151,10 @@ public class MemorySize implements java.io.Serializable {
 	/**
 	 * Parses the given string as bytes.
 	 * The supported expressions are listed under {@link MemorySize}.
-	 * 
+	 *
 	 * @param text The string to parse
 	 * @return The parsed size, in bytes.
-	 * 
+	 *
 	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
 	 */
 	public static long parseBytes(String text) throws IllegalArgumentException {
@@ -208,7 +208,7 @@ public class MemorySize implements java.io.Serializable {
 				multiplier = 1024L * 1024L * 1024L * 1024L;
 			}
 			else {
-				throw new IllegalArgumentException("Memory size unit '" + unit + 
+				throw new IllegalArgumentException("Memory size unit '" + unit +
 						"' does not match any of the recognized units: " + ALL_UNITS);
 			}
 		}
@@ -217,7 +217,7 @@ public class MemorySize implements java.io.Serializable {
 
 		// check for overflow
 		if (result / multiplier != value) {
-			throw new IllegalArgumentException("The value '" + text + 
+			throw new IllegalArgumentException("The value '" + text +
 					"' cannot be re represented as 64bit number of bytes (numeric overflow).");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 42eb575..24655fe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -15,12 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
+/**
+ * Configuration options for metrics and metric reporters.
+ */
 @PublicEvolving
 public class MetricOptions {
 
@@ -78,7 +82,7 @@ public class MetricOptions {
 		key("metrics.scope.operator")
 			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>");
 
-	/** The number of measured latencies to maintain at each operator */
+	/** The number of measured latencies to maintain at each operator. */
 	public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
 		key("metrics.latency.history-size")
 			.defaultValue(128);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 7b3b551..1c943b6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -84,12 +84,12 @@ public class ResourceManagerOptions {
 
 	/**
 	 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
-	 * setting custom environment variables for the workers (TaskManagers)
+	 * setting custom environment variables for the workers (TaskManagers).
 	 */
 	public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
-	
+
 	// ---------------------------------------------------------------------------------------------
 
-	/** Not intended to be instantiated */
+	/** Not intended to be instantiated. */
 	private ResourceManagerOptions() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index a2a2013..906e266 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -27,6 +27,7 @@ import static org.apache.flink.configuration.ConfigOptions.key;
  */
 @Internal
 public class RestOptions {
+
 	/**
 	 * The address that the server binds itself to / the client connects to.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index fef0975..f2c2289 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -35,14 +35,14 @@ public class TaskManagerOptions {
 	// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
 
 	/**
-	 * JVM heap size (in megabytes) for the TaskManagers
+	 * JVM heap size (in megabytes) for the TaskManagers.
 	 */
 	public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
 			key("taskmanager.heap.mb")
 			.defaultValue(1024);
 
 	/**
-	 * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError
+	 * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
 	 */
 	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
 			key("taskmanager.jvm-exit-on-oom")
@@ -61,7 +61,7 @@ public class TaskManagerOptions {
 	 * The default network port range the task manager expects incoming IPC connections. The {@code "0"} means that
 	 * the TaskManager searches for a free port.
 	 */
-	public static final ConfigOption<String> RPC_PORT = 
+	public static final ConfigOption<String> RPC_PORT =
 		key("taskmanager.rpc.port")
 			.defaultValue("0");
 
@@ -131,14 +131,14 @@ public class TaskManagerOptions {
 			.defaultValue(0.1f);
 
 	/**
-	 * Minimum memory size for network buffers (in bytes)
+	 * Minimum memory size for network buffers (in bytes).
 	 */
 	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
 			key("taskmanager.network.memory.min")
 			.defaultValue(64L << 20); // 64 MB
 
 	/**
-	 * Maximum memory size for network buffers (in bytes)
+	 * Maximum memory size for network buffers (in bytes).
 	 */
 	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
 			key("taskmanager.network.memory.max")
@@ -147,7 +147,7 @@ public class TaskManagerOptions {
 	/**
 	 * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
 	 *
-	 * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization
+	 * <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
 	 */
 	public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
 			key("taskmanager.network.memory.buffers-per-channel")
@@ -219,6 +219,6 @@ public class TaskManagerOptions {
 
 	// ------------------------------------------------------------------------
 
-	/** Not intended to be instantiated */
+	/** Not intended to be instantiated. */
 	private TaskManagerOptions() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
index 24ad61e..f92de1c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -27,13 +27,13 @@ import java.util.Properties;
  */
 @Public
 public class UnmodifiableConfiguration extends Configuration {
-	
+
 	private static final long serialVersionUID = -8151292629158972280L;
 
 	/**
 	 * Creates a new UnmodifiableConfiguration, which holds a copy of the given configuration
 	 * that cannot be altered.
-	 * 
+	 *
 	 * @param config The configuration with the original contents.
 	 */
 	public UnmodifiableConfiguration(Configuration config) {
@@ -44,7 +44,6 @@ public class UnmodifiableConfiguration extends Configuration {
 	//  All mutating methods must fail
 	// --------------------------------------------------------------------------------------------
 
-
 	@Override
 	public void addAllToProperties(Properties props) {
 		// override to make the UnmodifiableConfigurationTest happy

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c2d97/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index 3733244..b74f23f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -27,6 +27,7 @@ import static org.apache.flink.configuration.ConfigOptions.key;
  */
 @PublicEvolving
 public class WebOptions {
+
 	/**
 	 * Config parameter defining the runtime monitor web-frontend server address.
 	 */
@@ -61,7 +62,7 @@ public class WebOptions {
 			.withDeprecatedKeys("jobmanager.web.refresh-interval");
 
 	/**
-	 * Config parameter to override SSL support for the JobManager Web UI
+	 * Config parameter to override SSL support for the JobManager Web UI.
 	 */
 	public static final ConfigOption<Boolean> SSL_ENABLED =
 		key("web.ssl.enabled")
@@ -156,8 +157,8 @@ public class WebOptions {
 		.key("web.timeout")
 		.defaultValue(10L * 1000L);
 
+	// ------------------------------------------------------------------------
 
-	private WebOptions() {
-		throw new IllegalAccessError();
-	}
+	/** Not meant to be instantiated. */
+	private WebOptions() {}
 }