You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/24 14:46:19 UTC

[1/8] flink git commit: [FLINK-6669] set inputEncoding to UTF-8 in scalastyle-maven-plugin

Repository: flink
Updated Branches:
  refs/heads/master 61914abff -> f827d730e


[FLINK-6669] set inputEncoding to UTF-8 in scalastyle-maven-plugin


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f827d730
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f827d730
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f827d730

Branch: refs/heads/master
Commit: f827d730e40ac1c71fe974d4fd674e55ad530cdb
Parents: ed65c25
Author: lingjinjiang <li...@gmail.com>
Authored: Tue May 23 10:51:46 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200

----------------------------------------------------------------------
 pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f827d730/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8c5f622..95fcf1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1337,6 +1337,7 @@ under the License.
 						<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
 						<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
 						<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
+						<inputEncoding>UTF-8</inputEncoding>
 						<outputEncoding>UTF-8</outputEncoding>
 					</configuration>
 				</plugin>


[8/8] flink git commit: [FLINK-6659] fix SavepointITCase leaving temporary data behind

Posted by ch...@apache.org.
[FLINK-6659] fix SavepointITCase leaving temporary data behind

-> use a JUnit '@Rule' that does the cleanup

This closes #3962.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe671476
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe671476
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe671476

Branch: refs/heads/master
Commit: fe6714760b0feb688aa2e8169b1a73028e42afd4
Parents: b58a420
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon May 22 16:55:39 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/test/checkpointing/SavepointITCase.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe671476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 72a1b63..1c8a429 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -68,7 +68,6 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseS
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -143,7 +142,7 @@ public class SavepointITCase extends TestLogger {
 		final int numSlotsPerTaskManager = 2;
 		final int parallelism = numTaskManagers * numSlotsPerTaskManager;
 		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-		final File testRoot = folder.newFolder();
+		final File testRoot = folder.getRoot();
 
 		TestingCluster flink = null;
 
@@ -424,7 +423,7 @@ public class SavepointITCase extends TestLogger {
 		// Test deadline
 		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
-		final File tmpDir = CommonTestUtils.createTempDirectory();
+		final File tmpDir = folder.getRoot();
 		final File savepointDir = new File(tmpDir, "savepoints");
 
 		TestingCluster flink = null;
@@ -494,7 +493,7 @@ public class SavepointITCase extends TestLogger {
 		// Test deadline
 		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
-		final File tmpDir = CommonTestUtils.createTempDirectory();
+		final File tmpDir = folder.getRoot();
 		final File savepointDir = new File(tmpDir, "savepoints");
 
 		TestingCluster flink = null;


[2/8] flink git commit: [FLINK-6320] fix unit test failing sometimes when deleting a temp directory

Posted by ch...@apache.org.
[FLINK-6320] fix unit test failing sometimes when deleting a temp directory

This closes #3966.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ad3d140
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ad3d140
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ad3d140

Branch: refs/heads/master
Commit: 6ad3d140f35722055c9011dbee88d19319cfbfbe
Parents: fe67147
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon May 22 16:31:08 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200

----------------------------------------------------------------------
 .../JobManagerHAJobGraphRecoveryITCase.java     | 36 ++++++--------------
 1 file changed, 10 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ad3d140/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 052195a..80b8e18 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
@@ -58,7 +57,9 @@ import org.apache.flink.util.TestLogger;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
@@ -68,7 +69,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Queue;
 import java.util.UUID;
@@ -88,32 +88,16 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 	private final static FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
 
-	private static final File FileStateBackendBasePath;
-
-	static {
-		try {
-			FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error in test setup. Could not create directory.", e);
-		}
-	}
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
 
 	@AfterClass
 	public static void tearDown() throws Exception {
 		ZooKeeper.shutdown();
-
-		if (FileStateBackendBasePath != null) {
-			FileUtils.deleteDirectory(FileStateBackendBasePath);
-		}
 	}
 
 	@Before
 	public void cleanUp() throws Exception {
-		if (FileStateBackendBasePath != null) {
-			FileUtils.cleanDirectory(FileStateBackendBasePath);
-		}
-
 		ZooKeeper.deleteAll();
 	}
 
@@ -125,7 +109,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	@Test
 	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
 		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
 
 		// Configure the cluster
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
@@ -172,7 +156,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	@Test
 	public void testClientNonDetachedListeningBehaviour() throws Exception {
 		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
 
 		// Test actor system
 		ActorSystem testSystem = null;
@@ -397,10 +381,10 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	/**
 	 * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
 	 */
-	private static void verifyCleanRecoveryState(Configuration config) throws Exception {
+	private void verifyCleanRecoveryState(Configuration config) throws Exception {
 		// File state backend empty
 		Collection<File> stateHandles = FileUtils.listFiles(
-				FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
 
 		if (!stateHandles.isEmpty()) {
 			fail("File state backend is not clean: " + stateHandles);
@@ -429,10 +413,10 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	/**
 	 * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
 	 */
-	private static void verifyRecoveryState(Configuration config) throws Exception {
+	private void verifyRecoveryState(Configuration config) throws Exception {
 		// File state backend empty
 		Collection<File> stateHandles = FileUtils.listFiles(
-			FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
 
 		if (stateHandles.isEmpty()) {
 			fail("File state backend has been cleaned: " + stateHandles);


[7/8] flink git commit: [FLINK-5376] Fix log statement in UnorderedStreamElementQueue

Posted by ch...@apache.org.
[FLINK-5376] Fix log statement in UnorderedStreamElementQueue

This closes #3948.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed65c253
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed65c253
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed65c253

Branch: refs/heads/master
Commit: ed65c253478e688fdfbb149a1a75dc01b8537cab
Parents: 6ad3d14
Author: zentol <ch...@apache.org>
Authored: Fri May 19 14:33:18 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200

----------------------------------------------------------------------
 .../api/operators/async/queue/UnorderedStreamElementQueue.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed65c253/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index c9dc358..e6f71bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -148,7 +148,7 @@ public class UnorderedStreamElementQueue implements StreamElementQueue {
 				hasCompletedEntries.await();
 			}
 
-			LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
+			LOG.debug("Peeked head element from unordered stream element queue with filling degree " +
 				"({}/{}).", numberEntries, capacity);
 
 			return completedQueue.peek();


[3/8] flink git commit: [FLINK-6432] [py] Activate strict checkstyle for flink-python

Posted by ch...@apache.org.
[FLINK-6432] [py] Activate strict checkstyle for flink-python

This closes #3969.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04fae536
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04fae536
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04fae536

Branch: refs/heads/master
Commit: 04fae5362bdc933c111c92a9cc3b3b2c1d71850e
Parents: ce573c6
Author: zentol <ch...@apache.org>
Authored: Tue May 2 18:03:48 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200

----------------------------------------------------------------------
 flink-libraries/flink-python/pom.xml            | 35 +++++++++++++++
 .../flink/python/api/PythonOperationInfo.java   | 14 ++++--
 .../apache/flink/python/api/PythonOptions.java  |  5 ++-
 .../flink/python/api/PythonPlanBinder.java      |  6 ++-
 .../python/api/functions/PythonCoGroup.java     |  6 ++-
 .../api/functions/PythonMapPartition.java       |  4 +-
 .../api/functions/util/IdentityGroupReduce.java | 10 +++--
 .../python/api/functions/util/KeyDiscarder.java |  7 +--
 .../api/functions/util/NestedKeyDiscarder.java  |  8 ++--
 .../api/functions/util/SerializerMap.java       |  7 +--
 .../functions/util/StringDeserializerMap.java   |  7 +--
 .../util/StringTupleDeserializerMap.java        |  7 +--
 .../streaming/data/PythonDualInputSender.java   |  1 +
 .../streaming/data/PythonDualInputStreamer.java |  1 +
 .../api/streaming/data/PythonReceiver.java      | 13 +++---
 .../python/api/streaming/data/PythonSender.java | 19 +++++++--
 .../streaming/data/PythonSingleInputSender.java |  1 +
 .../data/PythonSingleInputStreamer.java         |  1 +
 .../api/streaming/data/PythonStreamer.java      |  2 +
 .../data/SingleElementPushBackIterator.java     |  1 +
 .../api/streaming/plan/PythonPlanReceiver.java  | 12 +++---
 .../api/streaming/plan/PythonPlanSender.java    |  4 +-
 .../api/streaming/plan/PythonPlanStreamer.java  | 10 +++--
 .../api/streaming/util/SerializationUtils.java  | 45 ++++++++++++++++++++
 .../api/streaming/util/StreamPrinter.java       |  1 +
 .../python/api/types/CustomTypeWrapper.java     |  1 +
 .../apache/flink/python/api/util/SetCache.java  |  1 +
 .../flink/python/api/PythonPlanBinderTest.java  |  6 ++-
 .../data/SingleElementPushBackIteratorTest.java |  4 ++
 29 files changed, 189 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml
index 6a57c3e..f6cea0c 100644
--- a/flink-libraries/flink-python/pom.xml
+++ b/flink-libraries/flink-python/pom.xml
@@ -48,6 +48,41 @@ under the License.
                     </archive>
                 </configuration>
             </plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+					Execute checkstyle after compilation but before tests.
+
+					This ensures that any parsing or type checking errors are from
+					javac, so they look as expected. Beyond that, we want to
+					fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 694c1b4..42eeffb 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -10,17 +10,23 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api;
 
-import java.io.IOException;
-import java.util.Arrays;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
+
+/**
+ * Generic container for all information required to an operation to the DataSet API.
+ */
 public class PythonOperationInfo {
 	public String identifier;
 	public int parentID; //DataSet that an operation is applied on
@@ -121,7 +127,7 @@ public class PythonOperationInfo {
 		return sb.toString();
 	}
 
-	public enum DatasizeHint {
+	enum DatasizeHint {
 		NONE,
 		TINY,
 		HUGE

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
index de053a0..4137c11 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.python.api;
 
 import org.apache.flink.configuration.ConfigOption;
@@ -40,8 +41,8 @@ public class PythonOptions {
 	 * The config parameter defining the size of the memory-mapped files, in kb.
 	 * This value must be large enough to ensure that the largest serialized record can be written completely into
 	 * the file.
-	 * 
-	 * Every task will allocate 2 memory-files, each with this size.
+	 *
+	 * <p>Every task will allocate 2 memory-files, each with this size.
 	 */
 	public static final ConfigOption<Long> MMAP_FILE_SIZE =
 		key("python.mmap.size.kb")

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 2378d60..810c8cd 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -10,6 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api;
 
 import org.apache.flink.api.common.JobExecutionResult;
@@ -44,6 +45,7 @@ import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 import org.apache.flink.python.api.util.SetCache;
 import org.apache.flink.runtime.filecache.FileCache;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,9 +106,9 @@ public class PythonPlanBinder {
 		tmpPlanFilesDir = configuredPlanTmpPath != null
 			? configuredPlanTmpPath
 			: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
-		
+
 		tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
-		
+
 		String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
 		pythonLibraryPath = flinkRootDir != null
 				//command-line

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index a5e3e75..eaec88525 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -10,16 +10,18 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.functions;
 
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.api.streaming.data.PythonDualInputStreamer;
 import org.apache.flink.util.Collector;
+
 import java.io.IOException;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  * CoGroupFunction that uses a python script.

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index 207ead9..924538e 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -10,9 +10,9 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.functions;
 
-import java.io.IOException;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -21,6 +21,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.api.streaming.data.PythonSingleInputStreamer;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+
 /**
  * Multi-purpose class, usable by all operations using a python script with one input source and possibly differing
  * in-/output types.

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
index 32fd22a..6d82e34 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
@@ -10,15 +10,17 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.functions.util;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.util.Collector;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 
-/*
-Utility function to group and sort data.
-*/
+/**
+ * Utility function to group and sort data.
+ * @param <IN> input type
+ */
 @ForwardedFields("*->*")
 public class IdentityGroupReduce<IN> implements GroupReduceFunction<IN, IN> {
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java
index b2af7be..985eee5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java
@@ -10,15 +10,16 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-/*
-Utility function to extract the value from a Key-Value Tuple.
-*/
+/**
+ * Utility function to extract the value from a Key-Value Tuple.
+ */
 @ForwardedFields("f1->*")
 public class KeyDiscarder <T> implements MapFunction<Tuple2<T, byte[]>, byte[]> {
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
index 4c8511e..a2c9e52 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
@@ -10,6 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -17,9 +18,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-/*
-Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin.
-*/
+/**
+ * Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin.
+ * @param <IN> input type
+ */
 @ForwardedFields("f0.f1->f0; f1.f1->f1")
 public class NestedKeyDiscarder<IN> implements MapFunction<IN, Tuple2<byte[], byte[]>> {
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
index 116efd4..aaa7544 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
@@ -10,15 +10,16 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.python.api.streaming.util.SerializationUtils;
 import org.apache.flink.python.api.streaming.util.SerializationUtils.Serializer;
 
-/*
-Utility function to serialize values, usually directly from data sources.
-*/
+/**
+ * Utility function to serialize values, usually directly from data sources.
+ */
 public class SerializerMap<IN> implements MapFunction<IN, byte[]> {
 	private transient Serializer<IN> serializer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
index 3d79b08..c72b4be 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
@@ -10,14 +10,15 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.configuration.ConfigConstants;
 
-/*
-Utility function to deserialize strings, used for non-CSV sinks.
-*/
+/**
+ * Utility function to deserialize strings, used for non-CSV sinks.
+ */
 public class StringDeserializerMap implements MapFunction<byte[], String> {
 	@Override
 	public String map(byte[] value) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
index af5eac6..48082ac 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
@@ -10,15 +10,16 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.ConfigConstants;
 
-/*
-Utility function to deserialize strings, used for CSV sinks.
-*/
+/**
+ * Utility function to deserialize strings, used for CSV sinks.
+ */
 public class StringTupleDeserializerMap implements MapFunction<byte[], Tuple1<String>> {
 	@Override
 	public Tuple1<String> map(byte[] value) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
index a16f522..de129b1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
index 8c9fde9..bc3e546 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
index c7c1f7a..d8436c7 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -10,20 +10,21 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonOptions;
+import org.apache.flink.util.Collector;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.Serializable;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonOptions;
-import org.apache.flink.util.Collector;
 
 /**
  * This class is used to read data from memory-mapped files.

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
index 3d13271..2739fb1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -10,6 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.java.tuple.Tuple;
@@ -41,7 +42,7 @@ public abstract class PythonSender implements Serializable {
 	private transient MappedByteBuffer fileBuffer;
 
 	private final long mappedFileSizeBytes;
-	
+
 	private final Configuration config;
 
 	protected PythonSender(Configuration config) {
@@ -59,7 +60,6 @@ public abstract class PythonSender implements Serializable {
 		outputFile.createNewFile();
 		outputRAF = new RandomAccessFile(outputFile, "rw");
 
-		
 		outputRAF.setLength(mappedFileSizeBytes);
 		outputRAF.seek(mappedFileSizeBytes - 1);
 		outputRAF.writeByte(0);
@@ -125,16 +125,29 @@ public abstract class PythonSender implements Serializable {
 		throw new IllegalArgumentException("This object can't be serialized: " + value);
 	}
 
+	/**
+	 * Interface for all serializers used by {@link PythonSender} classes to write container objects.
+	 *
+	 * <p>These serializers must be kept in sync with the python counterparts.
+	 *
+	 * @param <T> input type
+	 */
 	protected abstract static class Serializer<T> {
 		protected ByteBuffer buffer;
 
+		/**
+		 * Serializes the given value into a {@link ByteBuffer}.
+		 *
+		 * @param value value to serialize
+		 * @return ByteBuffer containing serialized record
+		 */
 		public ByteBuffer serialize(T value) {
 			serializeInternal(value);
 			buffer.flip();
 			return buffer;
 		}
 
-		public abstract void serializeInternal(T value);
+		protected abstract void serializeInternal(T value);
 	}
 
 	private static class ArraySerializer extends Serializer<byte[]> {

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
index 74d0604..af093f1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
index 6c0a13c..d596c39 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index cc4ba43..3fec947 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -10,6 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -20,6 +21,7 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializ
 import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
index ef80c98..ea10b1d 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.util.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
index 6276302..a90f581 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -10,12 +10,17 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.plan;
 
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.flink.api.java.tuple.Tuple;
+
 import static org.apache.flink.python.api.streaming.data.PythonReceiver.createTuple;
 import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_BOOLEAN;
 import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_BYTE;
@@ -27,9 +32,6 @@ import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE
 import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_NULL;
 import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_STRING;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.python.api.types.CustomTypeWrapper;
-
 /**
  * Instances of this class can be used to receive data from the plan process.
  */
@@ -82,7 +84,7 @@ public class PythonPlanReceiver {
 	}
 
 	private abstract static class Deserializer<T> {
-		
+
 		public T deserialize() throws IOException {
 			return deserialize(false);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
index 331c67e..db78b8c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -10,12 +10,14 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.plan;
 
+import org.apache.flink.python.api.streaming.util.SerializationUtils;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import org.apache.flink.python.api.streaming.util.SerializationUtils;
 
 /**
  * Instances of this class can be used to send data to the plan process.

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 9e93dda..d25f3d5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -10,12 +10,14 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.plan;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.api.PythonOptions;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +42,7 @@ public class PythonPlanStreamer {
 	private Process process;
 	private ServerSocket server;
 	private Socket socket;
-	
+
 	public PythonPlanStreamer(Configuration config) {
 		this.config = config;
 	}
@@ -90,7 +92,7 @@ public class PythonPlanStreamer {
 			return false;
 		}
 		while (true) {
-			try {		
+			try {
 				socket = server.accept();
 				sender = new PythonPlanSender(socket.getOutputStream());
 				receiver = new PythonPlanReceiver(socket.getInputStream());
@@ -107,7 +109,7 @@ public class PythonPlanStreamer {
 			}
 		}
 	}
-	
+
 	public void finishPlanMode() {
 		try {
 			socket.close();
@@ -143,7 +145,7 @@ public class PythonPlanStreamer {
 			return ProcessState.RUNNING;
 		}
 	}
-	
+
 	private enum ProcessState {
 		RUNNING,
 		FAILED,

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
index 721746a..1fe3ba1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -10,6 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.util;
 
 import org.apache.flink.api.java.tuple.Tuple;
@@ -19,6 +20,9 @@ import org.apache.flink.python.api.types.CustomTypeWrapper;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
+/**
+ * Utility class containing serializers for all supported types.
+ */
 public class SerializationUtils {
 	public static final byte TYPE_BOOLEAN = 34;
 	public static final byte TYPE_BYTE = 33;
@@ -85,6 +89,14 @@ public class SerializationUtils {
 		return (Serializer<IN>) serializer;
 	}
 
+	/**
+	 * Super class for all serializers used to serialize data. These serializers are used to serialize values emitted
+	 * from java input formats.
+	 *
+	 * <p>These serializer smut be kept in sync with the python counterparts.
+	 *
+	 * @param <IN> input type
+	 */
 	public abstract static class Serializer<IN> {
 		private byte[] typeInfo = null;
 
@@ -109,6 +121,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for {@link CustomTypeWrapper CustomTypeWrappers}.
+	 */
 	public static class CustomTypeWrapperSerializer extends Serializer<CustomTypeWrapper> {
 		private final byte type;
 
@@ -129,6 +144,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for bytes.
+	 */
 	public static class ByteSerializer extends Serializer<Byte> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Byte value) {
@@ -141,6 +159,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for booleans.
+	 */
 	public static class BooleanSerializer extends Serializer<Boolean> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Boolean value) {
@@ -153,6 +174,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for ints.
+	 */
 	public static class IntSerializer extends Serializer<Integer> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Integer value) {
@@ -167,6 +191,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for longs.
+	 */
 	public static class LongSerializer extends Serializer<Long> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Long value) {
@@ -181,6 +208,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for strings.
+	 */
 	public static class StringSerializer extends Serializer<String> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(String value) {
@@ -196,6 +226,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for floats.
+	 */
 	public static class FloatSerializer extends Serializer<Float> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Float value) {
@@ -210,6 +243,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for doubles.
+	 */
 	public static class DoubleSerializer extends Serializer<Double> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Double value) {
@@ -224,6 +260,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for null.
+	 */
 	public static class NullSerializer extends Serializer<Object> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Object value) {
@@ -236,6 +275,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for byte arrays.
+	 */
 	public static class BytesSerializer extends Serializer<byte[]> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(byte[] value) {
@@ -250,6 +292,9 @@ public class SerializationUtils {
 		}
 	}
 
+	/**
+	 * A serializer for tuples.
+	 */
 	public static class TupleSerializer extends Serializer<Tuple> {
 		private final Serializer<Object>[] serializer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
index c6a1ede..313b983 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
@@ -10,6 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.util;
 
 import org.apache.flink.configuration.ConfigConstants;

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
index e16c3eb..f5674d5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
@@ -10,6 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api.types;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java
index 750ba63..48011fc 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.python.api.util;
 
 import org.apache.flink.api.java.DataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 20f3503..701ac73 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -10,6 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.flink.python.api;
 
 import org.apache.flink.configuration.Configuration;
@@ -23,8 +24,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Tests for the PythonPlanBinder.
+ */
 public class PythonPlanBinderTest extends JavaProgramTestBase {
-	
+
 	@Override
 	protected boolean skipCollectionExecution() {
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
index 5e9eb42..a41aac3 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.python.api.streaming.data;
 
 import org.junit.Assert;
@@ -24,6 +25,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
+/**
+ * Tests for the SingleElementPushBackIterator.
+ */
 public class SingleElementPushBackIteratorTest {
 
 	@Test


[5/8] flink git commit: [FLINK-6431] [metrics] Activate strict checkstyle in flink-metrics

Posted by ch...@apache.org.
[FLINK-6431] [metrics] Activate strict checkstyle in flink-metrics

This closes #3968.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce573c65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce573c65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce573c65

Branch: refs/heads/master
Commit: ce573c65e2573ae12c8f6a76cc580445886a0a74
Parents: 61914ab
Author: zentol <ch...@apache.org>
Authored: Tue May 2 17:31:27 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/metrics/CharacterFilter.java   |   2 +-
 .../org/apache/flink/metrics/Histogram.java     |   2 +-
 .../flink/metrics/HistogramStatistics.java      |   6 +-
 .../org/apache/flink/metrics/MeterView.java     |  23 +-
 .../org/apache/flink/metrics/MetricConfig.java  |   1 +
 .../org/apache/flink/metrics/MetricGroup.java   |  20 +-
 .../org/apache/flink/metrics/SimpleCounter.java |   3 +-
 .../java/org/apache/flink/metrics/View.java     |   3 +-
 .../metrics/reporter/AbstractReporter.java      |   1 +
 .../flink/metrics/reporter/MetricReporter.java  |   4 +-
 .../apache/flink/metrics/util/TestMeter.java    |   3 +
 .../org/apache/flink/metrics/MeterViewTest.java |   4 +
 .../apache/flink/metrics/datadog/DCounter.java  |   8 +-
 .../apache/flink/metrics/datadog/DGauge.java    |   9 +-
 .../apache/flink/metrics/datadog/DMeter.java    |   6 +-
 .../apache/flink/metrics/datadog/DMetric.java   |   8 +-
 .../apache/flink/metrics/datadog/DSeries.java   |   8 +-
 .../metrics/datadog/DatadogHttpClient.java      |   8 +-
 .../metrics/datadog/DatadogHttpReporter.java    |  31 +-
 .../flink/metrics/datadog/MetricType.java       |   8 +-
 .../metrics/datadog/DatadogHttpClientTest.java  | 317 +++++++++----------
 .../dropwizard/ScheduledDropwizardReporter.java |  11 +-
 .../metrics/DropwizardHistogramStatistics.java  |   3 +-
 .../dropwizard/metrics/FlinkCounterWrapper.java |   4 +
 .../dropwizard/metrics/FlinkGaugeWrapper.java   |   5 +-
 .../metrics/FlinkHistogramWrapper.java          |   3 +-
 .../dropwizard/metrics/FlinkMeterWrapper.java   |   3 +-
 .../metrics/HistogramStatisticsWrapper.java     |   6 +-
 .../ScheduledDropwizardReporterTest.java        |  10 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |  27 +-
 .../metrics/DropwizardMeterWrapperTest.java     |   3 +
 .../metrics/FlinkMeterWrapperTest.java          |   4 +
 .../flink/metrics/ganglia/GangliaReporter.java  |  15 +-
 .../metrics/graphite/GraphiteReporter.java      |  14 +-
 .../apache/flink/metrics/jmx/JMXReporter.java   |  46 ++-
 .../flink/metrics/jmx/JMXReporterTest.java      |  13 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |   9 +
 .../flink/metrics/statsd/StatsDReporter.java    |  18 +-
 .../metrics/statsd/StatsDReporterTest.java      |  17 +-
 flink-metrics/pom.xml                           |  39 +++
 40 files changed, 418 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
index 1e9fbc4..10cd9ce 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
@@ -27,7 +27,7 @@ public interface CharacterFilter {
 	/**
 	 * Filter the given string and generate a resulting string from it.
 	 *
-	 * For example, one implementation could filter out invalid characters from the input string.
+	 * <p>For example, one implementation could filter out invalid characters from the input string.
 	 *
 	 * @param input Input string
 	 * @return Filtered result string

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
index af5c9b0..070a52a 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
@@ -21,7 +21,7 @@ package org.apache.flink.metrics;
 /**
  * Histogram interface to be used with Flink's metrics system.
  *
- * The histogram allows to record values, get the current count of recorded values and create
+ * <p>The histogram allows to record values, get the current count of recorded values and create
  * histogram statistics for the currently seen elements.
  */
 public interface Histogram extends Metric {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
index b2e4507..7a4b0a9 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
@@ -21,7 +21,7 @@ package org.apache.flink.metrics;
 /**
  * Histogram statistics represent the current snapshot of elements recorded in the histogram.
  *
- * The histogram statistics allow to calculate values for quantiles, the mean, the standard
+ * <p>The histogram statistics allow to calculate values for quantiles, the mean, the standard
  * deviation, the minimum and the maximum.
  */
 public abstract class HistogramStatistics {
@@ -35,14 +35,14 @@ public abstract class HistogramStatistics {
 	public abstract double getQuantile(double quantile);
 
 	/**
-	 * Returns the elements of the statistics' sample
+	 * Returns the elements of the statistics' sample.
 	 *
 	 * @return Elements of the statistics' sample
 	 */
 	public abstract long[] getValues();
 
 	/**
-	 * Returns the size of the statistics' sample
+	 * Returns the size of the statistics' sample.
 	 *
 	 * @return Size of the statistics' sample
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
index b87b983..8df0e86 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
@@ -15,32 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 /**
  * A MeterView provides an average rate of events per second over a given time period.
- * 
- * The primary advantage of this class is that the rate is neither updated by the computing thread nor for every event.
+ *
+ * <p>The primary advantage of this class is that the rate is neither updated by the computing thread nor for every event.
  * Instead, a history of counts is maintained that is updated in regular intervals by a background thread. From this
  * history a rate is derived on demand, which represents the average rate of events over the given time span.
- * 
- * Setting the time span to a low value reduces memory-consumption and will more accurately report short-term changes.
+ *
+ * <p>Setting the time span to a low value reduces memory-consumption and will more accurately report short-term changes.
  * The minimum value possible is {@link View#UPDATE_INTERVAL_SECONDS}.
  * A high value in turn increases memory-consumption, since a longer history has to be maintained, but will result in
  * smoother transitions between rates.
- * 
- * The events are counted by a {@link Counter}.
+ *
+ * <p>The events are counted by a {@link Counter}.
  */
 public class MeterView implements Meter, View {
-	/** The underlying counter maintaining the count */
+	/** The underlying counter maintaining the count. */
 	private final Counter counter;
-	/** The time-span over which the average is calculated */
+	/** The time-span over which the average is calculated. */
 	private final int timeSpanInSeconds;
-	/** Circular array containing the history of values */
+	/** Circular array containing the history of values. */
 	private final long[] values;
-	/** The index in the array for the current time */
+	/** The index in the array for the current time. */
 	private int time = 0;
-	/** The last rate we computed */
+	/** The last rate we computed. */
 	private double currentRate = 0;
 
 	public MeterView(int timeSpanInSeconds) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
index 4a2e616..699afdf 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index d4221ef..39ab3b6 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -22,10 +22,10 @@ import java.util.Map;
 
 /**
  * A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups.
- * 
+ *
  * <p>Instances of this class can be used to register new metrics with Flink and to create a nested
  * hierarchy based on the group names.
- * 
+ *
  * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
  */
 public interface MetricGroup {
@@ -69,7 +69,7 @@ public interface MetricGroup {
 	 * @return the given counter
 	 */
 	<C extends Counter> C counter(String name, C counter);
-	
+
 	/**
 	 * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
 	 *
@@ -95,7 +95,7 @@ public interface MetricGroup {
 	 *
 	 * @param name name of the histogram
 	 * @param histogram histogram to register
-	 * @param <H> histogram type   
+	 * @param <H> histogram type
 	 * @return the registered histogram
 	 */
 	<H extends Histogram> H histogram(String name, H histogram);
@@ -105,7 +105,7 @@ public interface MetricGroup {
 	 *
 	 * @param name name of the histogram
 	 * @param histogram histogram to register
-	 * @param <H> histogram type   
+	 * @param <H> histogram type
 	 * @return the registered histogram
 	 */
 	<H extends Histogram> H histogram(int name, H histogram);
@@ -156,7 +156,7 @@ public interface MetricGroup {
 
 	/**
 	 * Gets the scope as an array of the scope components, for example
-	 * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
+	 * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}.
 	 *
 	 * @see #getMetricIdentifier(String)
 	 * @see #getMetricIdentifier(String, CharacterFilter)
@@ -165,15 +165,15 @@ public interface MetricGroup {
 
 	/**
 	 * Returns a map of all variables and their associated value, for example
-	 * {@code {"<host>"="host-7", "<tm_id>"="taskmanager-2"}}
-	 * 
+	 * {@code {"<host>"="host-7", "<tm_id>"="taskmanager-2"}}.
+	 *
 	 * @return map of all variables and their associated value
      */
 	Map<String, String> getAllVariables();
 
 	/**
 	 * Returns the fully qualified metric name, for example
-	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
 	 *
 	 * @param metricName metric name
 	 * @return fully qualified metric name
@@ -182,7 +182,7 @@ public interface MetricGroup {
 
 	/**
 	 * Returns the fully qualified metric name, for example
-	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
 	 *
 	 * @param metricName metric name
 	 * @param filter character filter which is applied to the scope components if not null.

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
index 6ec3b28..2efc0c9 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 /**
@@ -22,7 +23,7 @@ package org.apache.flink.metrics;
  */
 public class SimpleCounter implements Counter {
 
-	/** the current count */
+	/** the current count. */
 	private long count;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
index 1780130..2bbb3a9 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 /**
  * An interface for metrics which should be updated in regular intervals by a background thread.
  */
 public interface View {
-	/** The interval in which metrics are updated */
+	/** The interval in which metrics are updated. */
 	int UPDATE_INTERVAL_SECONDS = 5;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
index 0c8d9ad..c0aeb4b 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
index ee92a10..5c8085f 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
@@ -24,7 +24,7 @@ import org.apache.flink.metrics.MetricGroup;
 
 /**
  * Reporters are used to export {@link Metric Metrics} to an external backend.
- * 
+ *
  * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a
  * public no-argument constructor.
  */
@@ -37,7 +37,7 @@ public interface MetricReporter {
 	/**
 	 * Configures this reporter. Since reporters are instantiated generically and hence parameter-less,
 	 * this method is the place where the reporters set their basic fields based on configuration values.
-	 * 
+	 *
 	 * <p>This method is always called first on a newly instantiated reporter.
 	 *
 	 * @param config A properties object that contains all parameters set for this reporter.

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
index b44b996..b1ec3a3 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
@@ -20,6 +20,9 @@ package org.apache.flink.metrics.util;
 
 import org.apache.flink.metrics.Meter;
 
+/**
+ * A dummy {@link Meter} implementation.
+ */
 public class TestMeter implements Meter {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
index 8ba298f..a7a63b0 100644
--- a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
+++ b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
@@ -15,12 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the MeterView.
+ */
 public class MeterViewTest {
 	@Test
 	public void testGetCount() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
index 58abbd6..e187efb 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
@@ -23,8 +23,8 @@ import org.apache.flink.metrics.Counter;
 import java.util.List;
 
 /**
- * Mapping of counter between Flink and Datadog
- * */
+ * Mapping of counter between Flink and Datadog.
+ */
 public class DCounter extends DMetric {
 	private final Counter counter;
 
@@ -35,8 +35,8 @@ public class DCounter extends DMetric {
 
 	/**
 	 * Visibility of this method must not be changed
-	 * since we deliberately not map it to json object in a Datadog-defined format
-	 * */
+	 * since we deliberately not map it to json object in a Datadog-defined format.
+	 */
 	@Override
 	public Number getMetricValue() {
 		return counter.getCount();

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
index 8deb117..ba97d59 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.metrics.datadog;
 
-
 import org.apache.flink.metrics.Gauge;
 
 import java.util.List;
 
 /**
- * Mapping of gauge between Flink and Datadog
- * */
+ * Mapping of gauge between Flink and Datadog.
+ */
 public class DGauge extends DMetric {
 	private final Gauge<Number> gauge;
 
@@ -36,8 +35,8 @@ public class DGauge extends DMetric {
 
 	/**
 	 * Visibility of this method must not be changed
-	 * since we deliberately not map it to json object in a Datadog-defined format
-	 * */
+	 * since we deliberately not map it to json object in a Datadog-defined format.
+	 */
 	@Override
 	public Number getMetricValue() {
 		return gauge.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
index 181a00c..68c61cf 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
@@ -23,10 +23,10 @@ import org.apache.flink.metrics.Meter;
 import java.util.List;
 
 /**
- * Mapping of meter between Flink and Datadog
+ * Mapping of meter between Flink and Datadog.
  *
- * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter
- * */
+ * <p>Only consider rate of the meter, due to Datadog HTTP API's limited support of meter
+ */
 public class DMeter extends DMetric {
 	private final Meter meter;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
index 3f9d6ff..e55a9f0 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
@@ -25,16 +25,16 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Abstract metric of Datadog for serialization
- * */
+ * Abstract metric of Datadog for serialization.
+ */
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public abstract class DMetric {
 	private static final long MILLIS_TO_SEC = 1000L;
 
 	/**
 	 * Names of metric/type/tags field and their getters must not be changed
-	 * since they are mapped to json objects in a Datadog-defined format
-	 * */
+	 * since they are mapped to json objects in a Datadog-defined format.
+	 */
 	private final String metric; // Metric name
 	private final MetricType type;
 	private final String host;

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
index fb0bb09..54d907c 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
@@ -22,13 +22,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Json serialization between Flink and Datadog
- **/
+ * Json serialization between Flink and Datadog.
+ */
 public class DSeries {
 	/**
 	 * Names of series field and its getters must not be changed
-	 * since they are mapped to json objects in a Datadog-defined format
-	 * */
+	 * since they are mapped to json objects in a Datadog-defined format.
+	 */
 	private List<DMetric> series;
 
 	public DSeries() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
index c94a2b7..086966b 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
@@ -23,15 +23,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
-import okhttp3.Response;
 import okhttp3.RequestBody;
+import okhttp3.Response;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Http client talking to Datadog
- * */
+ * Http client talking to Datadog.
+ */
 public class DatadogHttpClient{
 	private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s";
 	private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s";
@@ -69,7 +69,7 @@ public class DatadogHttpClient{
 				throw new IllegalArgumentException(
 					String.format("API key: %s is invalid", apiKey));
 			}
-		} catch(IOException e) {
+		} catch (IOException e) {
 			throw new IllegalStateException("Failed contacting Datadog to validate API key", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
index fcb5c4b..a47b2bf 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -20,13 +20,14 @@ package org.apache.flink.metrics.datadog;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,12 +37,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-
 /**
- * Metric Reporter for Datadog
+ * Metric Reporter for Datadog.
  *
- * Variables in metrics scope will be sent to Datadog as tags
- * */
+ * <p>Variables in metrics scope will be sent to Datadog as tags.
+ */
 public class DatadogHttpReporter implements MetricReporter, Scheduled {
 	private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class);
 	private static final String HOST_VARIABLE = "<host>";
@@ -146,20 +146,20 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
 	}
 
 	/**
-	 * Get config tags from config 'metrics.reporter.dghttp.tags'
-	 * */
+	 * Get config tags from config 'metrics.reporter.dghttp.tags'.
+	 */
 	private List<String> getTagsFromConfig(String str) {
 		return Arrays.asList(str.split(","));
 	}
 
 	/**
-	 * Get tags from MetricGroup#getAllVariables(), excluding 'host'
-	 * */
+	 * Get tags from MetricGroup#getAllVariables(), excluding 'host'.
+	 */
 	private List<String> getTagsFromMetricGroup(MetricGroup metricGroup) {
 		List<String> tags = new ArrayList<>();
 
 		for (Map.Entry<String, String> entry: metricGroup.getAllVariables().entrySet()) {
-			if(!entry.getKey().equals(HOST_VARIABLE)) {
+			if (!entry.getKey().equals(HOST_VARIABLE)) {
 				tags.add(getVariableName(entry.getKey()) + ":" + entry.getValue());
 			}
 		}
@@ -167,23 +167,20 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
 		return tags;
 	}
 
-	/**
-	 * Get host from MetricGroup#getAllVariables() if it exists; returns Null otherwise
-	 * */
 	private String getHostFromMetricGroup(MetricGroup metricGroup) {
 		return metricGroup.getAllVariables().get(HOST_VARIABLE);
 	}
 
 	/**
-	 * Given "<xxx>", return "xxx"
-	 * */
+	 * Removes leading and trailing angle brackets.
+	 */
 	private String getVariableName(String str) {
 		return str.substring(1, str.length() - 1);
 	}
 
 	/**
-	 * Compact metrics in batch, serialize them, and send to Datadog via HTTP
-	 * */
+	 * Compact metrics in batch, serialize them, and send to Datadog via HTTP.
+	 */
 	static class DatadogHttpRequest {
 		private final DSeries series;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
index 97f9b29..9681514 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
@@ -19,12 +19,12 @@
 package org.apache.flink.metrics.datadog;
 
 /**
- * Metric types supported by Datadog
- * */
+ * Metric types supported by Datadog.
+ */
 public enum MetricType {
 	/**
 	 * Names of 'gauge' and 'counter' must not be changed
-	 * since they are mapped to json objects in a Datadog-defined format
-	 * */
+	 * since they are mapped to json objects in a Datadog-defined format.
+	 */
 	gauge, counter
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
index bda5d47..0680252 100644
--- a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
+++ b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.metrics.datadog;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -35,165 +35,164 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
-@RunWith(Enclosed.class)
+/**
+ * Tests for the DatadogHttpClient.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DMetric.class)
 public class DatadogHttpClientTest {
-	public static class TestApiKey {
-		@Test(expected = IllegalArgumentException.class)
-		public void testClientWithEmptyKey() {
-			new DatadogHttpClient("");
-		}
-
-		@Test(expected = IllegalArgumentException.class)
-		public void testClientWithNullKey() {
-			new DatadogHttpClient(null);
-		}
+
+	private static List<String> tags = Arrays.asList("tag1", "tag2");
+
+	private static final long MOCKED_SYSTEM_MILLIS = 123L;
+
+	@Before
+	public void mockSystemMillis() {
+		PowerMockito.mockStatic(DMetric.class);
+		PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testClientWithEmptyKey() {
+		new DatadogHttpClient("");
 	}
 
-	@RunWith(PowerMockRunner.class)
-	@PrepareForTest(DMetric.class)
-	public static class TestSerialization {
-		private static List<String> tags = Arrays.asList("tag1", "tag2");
-
-		private static final long MOCKED_SYSTEM_MILLIS = 123L;
-
-		@Before
-		public void mockSystemMillis() {
-			PowerMockito.mockStatic(DMetric.class);
-			PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS);
-		}
-
-		@Test
-		public void serializeGauge() throws JsonProcessingException {
-
-			DGauge g = new DGauge(new Gauge<Number>() {
-				@Override
-				public Number getValue() {
-					return 1;
-				}
-			}, "testCounter", "localhost", tags);
-
-			assertEquals(
-				"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
-				DatadogHttpClient.serialize(g));
-		}
-
-		@Test
-		public void serializeGaugeWithoutHost() throws JsonProcessingException {
-
-			DGauge g = new DGauge(new Gauge<Number>() {
-				@Override
-				public Number getValue() {
-					return 1;
-				}
-			}, "testCounter", null, tags);
-
-			assertEquals(
-				"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
-				DatadogHttpClient.serialize(g));
-		}
-
-		@Test
-		public void serializeCounter() throws JsonProcessingException {
-			DCounter c = new DCounter(new Counter() {
-				@Override
-				public void inc() {}
-
-				@Override
-				public void inc(long n) {}
-
-				@Override
-				public void dec() {}
-
-				@Override
-				public void dec(long n) {}
-
-				@Override
-				public long getCount() {
-					return 1;
-				}
-			}, "testCounter", "localhost", tags);
-
-			assertEquals(
-				"{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
-				DatadogHttpClient.serialize(c));
-		}
-
-		@Test
-		public void serializeCounterWithoutHost() throws JsonProcessingException {
-			DCounter c = new DCounter(new Counter() {
-				@Override
-				public void inc() {}
-
-				@Override
-				public void inc(long n) {}
-
-				@Override
-				public void dec() {}
-
-				@Override
-				public void dec(long n) {}
-
-				@Override
-				public long getCount() {
-					return 1;
-				}
-			}, "testCounter", null, tags);
-
-			assertEquals(
-				"{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
-				DatadogHttpClient.serialize(c));
-		}
-
-		@Test
-		public void serializeMeter() throws JsonProcessingException {
-
-			DMeter m = new DMeter(new Meter() {
-				@Override
-				public void markEvent() {}
-
-				@Override
-				public void markEvent(long n) {}
-
-				@Override
-				public double getRate() {
-					return 1;
-				}
-
-				@Override
-				public long getCount() {
-					return 0;
-				}
-			}, "testMeter","localhost", tags);
-
-			assertEquals(
-				"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
-				DatadogHttpClient.serialize(m));
-		}
-
-		@Test
-		public void serializeMeterWithoutHost() throws JsonProcessingException {
-
-			DMeter m = new DMeter(new Meter() {
-				@Override
-				public void markEvent() {}
-
-				@Override
-				public void markEvent(long n) {}
-
-				@Override
-				public double getRate() {
-					return 1;
-				}
-
-				@Override
-				public long getCount() {
-					return 0;
-				}
-			}, "testMeter", null, tags);
-
-			assertEquals(
-				"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
-				DatadogHttpClient.serialize(m));
-		}
+	@Test(expected = IllegalArgumentException.class)
+	public void testClientWithNullKey() {
+		new DatadogHttpClient(null);
+	}
+
+	@Test
+	public void serializeGauge() throws JsonProcessingException {
+
+		DGauge g = new DGauge(new Gauge<Number>() {
+			@Override
+			public Number getValue() {
+				return 1;
+			}
+		}, "testCounter", "localhost", tags);
+
+		assertEquals(
+			"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+			DatadogHttpClient.serialize(g));
+	}
+
+	@Test
+	public void serializeGaugeWithoutHost() throws JsonProcessingException {
+
+		DGauge g = new DGauge(new Gauge<Number>() {
+			@Override
+			public Number getValue() {
+				return 1;
+			}
+		}, "testCounter", null, tags);
+
+		assertEquals(
+			"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+			DatadogHttpClient.serialize(g));
+	}
+
+	@Test
+	public void serializeCounter() throws JsonProcessingException {
+		DCounter c = new DCounter(new Counter() {
+			@Override
+			public void inc() {}
+
+			@Override
+			public void inc(long n) {}
+
+			@Override
+			public void dec() {}
+
+			@Override
+			public void dec(long n) {}
+
+			@Override
+			public long getCount() {
+				return 1;
+			}
+		}, "testCounter", "localhost", tags);
+
+		assertEquals(
+			"{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+			DatadogHttpClient.serialize(c));
+	}
+
+	@Test
+	public void serializeCounterWithoutHost() throws JsonProcessingException {
+		DCounter c = new DCounter(new Counter() {
+			@Override
+			public void inc() {}
+
+			@Override
+			public void inc(long n) {}
+
+			@Override
+			public void dec() {}
+
+			@Override
+			public void dec(long n) {}
+
+			@Override
+			public long getCount() {
+				return 1;
+			}
+		}, "testCounter", null, tags);
+
+		assertEquals(
+			"{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+			DatadogHttpClient.serialize(c));
+	}
+
+	@Test
+	public void serializeMeter() throws JsonProcessingException {
+
+		DMeter m = new DMeter(new Meter() {
+			@Override
+			public void markEvent() {}
+
+			@Override
+			public void markEvent(long n) {}
+
+			@Override
+			public double getRate() {
+				return 1;
+			}
+
+			@Override
+			public long getCount() {
+				return 0;
+			}
+		}, "testMeter", "localhost", tags);
+
+		assertEquals(
+			"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+			DatadogHttpClient.serialize(m));
+	}
+
+	@Test
+	public void serializeMeterWithoutHost() throws JsonProcessingException {
+
+		DMeter m = new DMeter(new Meter() {
+			@Override
+			public void markEvent() {}
+
+			@Override
+			public void markEvent(long n) {}
+
+			@Override
+			public double getRate() {
+				return 1;
+			}
+
+			@Override
+			public long getCount() {
+				return 0;
+			}
+		}, "testMeter", null, tags);
+
+		assertEquals(
+			"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+			DatadogHttpClient.serialize(m));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
index 380abc4..e3184da 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.dropwizard;
 
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-import com.codahale.metrics.ScheduledReporter;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
@@ -39,6 +36,10 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+import com.codahale.metrics.ScheduledReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -161,7 +162,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
 	public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
 		synchronized (this) {
 			String fullName;
-			
+
 			if (metric instanceof Counter) {
 				fullName = counters.remove(metric);
 			} else if (metric instanceof Gauge) {
@@ -173,7 +174,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
 			} else {
 				fullName = null;
 			}
-			
+
 			if (fullName != null) {
 				registry.remove(fullName);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
index 6f4eab2..8acf401 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.dropwizard.metrics;
 
-import com.codahale.metrics.Snapshot;
 import org.apache.flink.metrics.HistogramStatistics;
 
+import com.codahale.metrics.Snapshot;
+
 /**
  * Dropwizard histogram statistics implementation returned by {@link DropwizardHistogramWrapper}.
  * The statistics class wraps a {@link Snapshot} instance and forwards the method calls accordingly.

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
index a44c3f5..b4ea8dc 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
@@ -15,10 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.dropwizard.metrics;
 
 import org.apache.flink.metrics.Counter;
 
+/**
+ * A wrapper that allows a Flink counter to be used as a DropWizard counter.
+ */
 public class FlinkCounterWrapper extends com.codahale.metrics.Counter {
 	private final Counter counter;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
index 058ecad..0d53a9e 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
@@ -20,8 +20,11 @@ package org.apache.flink.dropwizard.metrics;
 
 import org.apache.flink.metrics.Gauge;
 
+/**
+ * A wrapper that allows a Flink gauge to be used as a DropWizard gauge.
+ */
 public class FlinkGaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
-	
+
 	private final Gauge<T> gauge;
 
 	public FlinkGaugeWrapper(Gauge<T> gauge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
index 8bd8078..d2167d0 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.dropwizard.metrics;
 
-import com.codahale.metrics.Snapshot;
 import org.apache.flink.metrics.Histogram;
 
+import com.codahale.metrics.Snapshot;
+
 /**
  * Wrapper to use a Flink {@link Histogram} as a Dropwizard {@link com.codahale.metrics.Histogram}.
  * This is necessary to report Flink's histograms via the Dropwizard

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
index d0b8483..213c21c 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.dropwizard.metrics;
 
-import com.codahale.metrics.Clock;
 import org.apache.flink.metrics.Meter;
 
+import com.codahale.metrics.Clock;
+
 /**
  * Wrapper to use a Flink {@link Meter} as a Dropwizard {@link com.codahale.metrics.Meter}.
  * This is necessary to report Flink's meters via the Dropwizard

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
index 6d3a69b..7b1ff74 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.dropwizard.metrics;
 
-import com.codahale.metrics.Snapshot;
 import org.apache.flink.metrics.HistogramStatistics;
 
+import com.codahale.metrics.Snapshot;
+
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
@@ -39,6 +40,7 @@ class HistogramStatisticsWrapper extends Snapshot {
 	HistogramStatisticsWrapper(HistogramStatistics histogramStatistics) {
 		this.histogramStatistics = histogramStatistics;
 	}
+
 	@Override
 	public double getValue(double quantile) {
 		return histogramStatistics.getQuantile(quantile);
@@ -76,7 +78,7 @@ class HistogramStatisticsWrapper extends Snapshot {
 
 	@Override
 	public void dump(OutputStream output) {
-		try(PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))){
+		try (PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))) {
 
 			for (Long value : histogramStatistics.getValues()) {
 				printWriter.printf("%d%n", value);

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 73e7f0b..85769c2 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.dropwizard;
 
-import com.codahale.metrics.ScheduledReporter;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -40,6 +39,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.AbstractID;
+
+import com.codahale.metrics.ScheduledReporter;
 import org.junit.Test;
 
 import java.lang.reflect.InvocationTargetException;
@@ -49,6 +50,9 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the ScheduledDropwizardReporter.
+ */
 public class ScheduledDropwizardReporterTest {
 
 	@Test
@@ -199,7 +203,6 @@ public class ScheduledDropwizardReporterTest {
 		assertEquals(1, rep.getGauges().size());
 		assertEquals(1, rep.registry.getGauges().size());
 
-
 		rep.notifyOfRemovedMetric(c, "counter", mp);
 		assertEquals(0, rep.getCounters().size());
 		assertEquals(0, rep.registry.getCounters().size());
@@ -217,6 +220,9 @@ public class ScheduledDropwizardReporterTest {
 		assertEquals(0, rep.registry.getGauges().size());
 	}
 
+	/**
+	 * Dummy test reporter.
+	 */
 	public static class TestingScheduledDropwizardReporter extends ScheduledDropwizardReporter {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 8f7796c..63765ae 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.dropwizard.metrics;
 
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.SlidingWindowReservoir;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
@@ -36,6 +28,15 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.util.TestLogger;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -52,6 +53,9 @@ import java.util.concurrent.TimeoutException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the DropwizardFlinkHistogramWrapper.
+ */
 public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 
 	/**
@@ -72,7 +76,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 		}
 
 		assertEquals(size, histogramWrapper.getStatistics().size());
-		assertEquals((size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+		assertEquals((size - 1) / 2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
 
 		for (int i = size; i < 2 * size; i++) {
 			histogramWrapper.update(i);
@@ -83,7 +87,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 		}
 
 		assertEquals(size, histogramWrapper.getStatistics().size());
-		assertEquals(size + (size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+		assertEquals(size + (size - 1) / 2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
 	}
 
 	/**
@@ -153,6 +157,9 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Test reporter.
+	 */
 	public static class TestingReporter extends ScheduledDropwizardReporter {
 		TestingScheduledReporter scheduledReporter = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
index 0b8fa52..baab773 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
@@ -25,6 +25,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the DropwizardMeterWrapper.
+ */
 public class DropwizardMeterWrapperTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
index b6389c5..2298e0f 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
@@ -20,12 +20,16 @@ package org.apache.flink.dropwizard.metrics;
 
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.util.TestMeter;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for the FlinkMeterWrapper.
+ */
 public class FlinkMeterWrapperTest {
 
 	private static final double DELTA = 0.0001;

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
index de9da74..8719901 100644
--- a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
+++ b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -18,20 +18,23 @@
 
 package org.apache.flink.metrics.ganglia;
 
-import com.codahale.metrics.ScheduledReporter;
-
-import info.ganglia.gmetric4j.gmetric.GMetric;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 
+import com.codahale.metrics.ScheduledReporter;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This class acts as a factory for the {@link com.codahale.metrics.ganglia.GangliaReporter} and allows using it as a
+ * Flink reporter.
+ */
 @PublicEvolving
 public class GangliaReporter extends ScheduledDropwizardReporter {
-	
+
 	public static final String ARG_DMAX = "dmax";
 	public static final String ARG_TMAX = "tmax";
 	public static final String ARG_TTL = "ttl";
@@ -72,7 +75,7 @@ public class GangliaReporter extends ScheduledDropwizardReporter {
 			builder.withTMax(tMax);
 
 			log.info("Configured GangliaReporter with {host:{}, port:{}, dmax:{}, tmax:{}, ttl:{}, addressingMode:{}}",
-				host, port, dMax, tMax, ttl, addressingMode);			
+				host, port, dMax, tMax, ttl, addressingMode);
 			return builder.build(gMetric);
 		} catch (IOException e) {
 			throw new RuntimeException("Error while instantiating GangliaReporter.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
index 47a9d87..3338af2 100644
--- a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.metrics.graphite;
 
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.graphite.Graphite;
-
-import com.codahale.metrics.graphite.GraphiteUDP;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteUDP;
+
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This class acts as a factory for the {@link com.codahale.metrics.graphite.GraphiteReporter} and allows using it as a
+ * Flink reporter.
+ */
 @PublicEvolving
 public class GraphiteReporter extends ScheduledDropwizardReporter {
 
@@ -78,7 +82,7 @@ public class GraphiteReporter extends ScheduledDropwizardReporter {
 		log.info("Configured GraphiteReporter with {host:{}, port:{}, protocol:{}}", host, port, prot);
 		switch(prot) {
 			case UDP:
-				return builder.build(new GraphiteUDP(host, port));				
+				return builder.build(new GraphiteUDP(host, port));
 			case TCP:
 			default:
 				return builder.build(new Graphite(host, port));

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index f0c0fcb..1cc7d38 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.util.NetUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +43,7 @@ import javax.management.ObjectName;
 import javax.management.remote.JMXConnectorServer;
 import javax.management.remote.JMXConnectorServerFactory;
 import javax.management.remote.JMXServiceURL;
+
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.MalformedURLException;
@@ -57,7 +59,7 @@ import java.util.Map;
 /**
  * {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
  *
- * Largely based on the JmxReporter class of the dropwizard metrics library
+ * <p>Largely based on the JmxReporter class of the dropwizard metrics library
  * https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
  */
 public class JMXReporter implements MetricReporter {
@@ -77,18 +79,15 @@ public class JMXReporter implements MetricReporter {
 
 	// ------------------------------------------------------------------------
 
-	/** The server where the management beans are registered and deregistered */
+	/** The server where the management beans are registered and deregistered. */
 	private final MBeanServer mBeanServer;
 
-	/** The names under which the registered metrics have been added to the MBeanServer */ 
+	/** The names under which the registered metrics have been added to the MBeanServer. */
 	private final Map<Metric, ObjectName> registeredMetrics;
 
 	/** The server to which JMX clients connect to. ALlows for better control over port usage. */
 	private JMXServer jmxServer;
 
-	/**
-	 * Creates a new JMXReporter
-	 */
 	public JMXReporter() {
 		this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
 		this.registeredMetrics = new HashMap<>();
@@ -140,7 +139,7 @@ public class JMXReporter implements MetricReporter {
 			}
 		}
 	}
-	
+
 	public int getPort() {
 		if (jmxServer == null) {
 			throw new NullPointerException("No server was opened. Did you specify a port?");
@@ -220,7 +219,7 @@ public class JMXReporter implements MetricReporter {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Utilities 
+	//  Utilities
 	// ------------------------------------------------------------------------
 
 	static Hashtable<String, String> generateJmxTable(Map<String, String> variables) {
@@ -239,9 +238,9 @@ public class JMXReporter implements MetricReporter {
 	 * Lightweight method to replace unsupported characters.
 	 * If the string does not contain any unsupported characters, this method creates no
 	 * new string (and in fact no new objects at all).
-	 * 
+	 *
 	 * <p>Replacements:
-	 * 
+	 *
 	 * <ul>
 	 *     <li>{@code "} is removed</li>
 	 *     <li>{@code space} is replaced by {@code _} (underscore)</li>
@@ -252,7 +251,7 @@ public class JMXReporter implements MetricReporter {
 		char[] chars = null;
 		final int strLen = str.length();
 		int pos = 0;
-		
+
 		for (int i = 0; i < strLen; i++) {
 			final char c = str.charAt(i);
 			switch (c) {
@@ -271,7 +270,7 @@ public class JMXReporter implements MetricReporter {
 					}
 					chars[pos++] = '_';
 					break;
-				
+
 				case ',':
 				case '=':
 				case ';':
@@ -292,18 +291,24 @@ public class JMXReporter implements MetricReporter {
 					pos++;
 			}
 		}
-		
+
 		return chars == null ? str : new String(chars, 0, pos);
 	}
 
 	// ------------------------------------------------------------------------
-	//  Interfaces and base classes for JMX beans 
+	//  Interfaces and base classes for JMX beans
 	// ------------------------------------------------------------------------
 
+	/**
+	 * The common MBean interface for all metrics.
+	 */
 	public interface MetricMBean {}
 
 	private abstract static class AbstractBean implements MetricMBean {}
 
+	/**
+	 * The MBean interface for an exposed counter.
+	 */
 	public interface JmxCounterMBean extends MetricMBean {
 		long getCount();
 	}
@@ -321,6 +326,9 @@ public class JMXReporter implements MetricReporter {
 		}
 	}
 
+	/**
+	 * The MBean interface for an exposed gauge.
+	 */
 	public interface JmxGaugeMBean extends MetricMBean {
 		Object getValue();
 	}
@@ -339,6 +347,9 @@ public class JMXReporter implements MetricReporter {
 		}
 	}
 
+	/**
+	 * The MBean interface for an exposed histogram.
+	 */
 	public interface JmxHistogramMBean extends MetricMBean {
 		long getCount();
 
@@ -427,6 +438,9 @@ public class JMXReporter implements MetricReporter {
 		}
 	}
 
+	/**
+	 * The MBean interface for an exposed meter.
+	 */
 	public interface JmxMeterMBean extends MetricMBean {
 		double getRate();
 
@@ -455,9 +469,9 @@ public class JMXReporter implements MetricReporter {
 	/**
 	 * JMX Server implementation that JMX clients can connect to.
 	 *
-	 * Heavily based on j256 simplejmx project
+	 * <p>Heavily based on j256 simplejmx project
 	 *
-	 * https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+	 * <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
 	 */
 	private static class JMXServer {
 		private Registry rmiRegistry;

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 85ab897..f10769a 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import javax.management.MBeanAttributeInfo;
@@ -41,6 +42,7 @@ import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
+
 import java.lang.management.ManagementFactory;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -51,6 +53,9 @@ import static org.apache.flink.metrics.jmx.JMXReporter.JMX_DOMAIN_PREFIX;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the JMXReporter.
+ */
 public class JMXReporterTest extends TestLogger {
 
 	@Test
@@ -140,7 +145,7 @@ public class JMXReporterTest extends TestLogger {
 
 		rep1.notifyOfRemovedMetric(g1, "rep1", null);
 		rep1.notifyOfRemovedMetric(g2, "rep2", null);
-		
+
 		mg.close();
 		reg.shutdown();
 	}
@@ -194,7 +199,7 @@ public class JMXReporterTest extends TestLogger {
 		ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
 		ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
 
-		JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jmxrmi");
+		JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jmxrmi");
 		JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
 		MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
 
@@ -203,7 +208,7 @@ public class JMXReporterTest extends TestLogger {
 
 		jmxCon1.close();
 
-		JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jmxrmi");
+		JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jmxrmi");
 		JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
 		MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 6d54db3..6b55eeb 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+
 import org.junit.Assert;
 import org.junit.Test;
 import scala.concurrent.Await;
@@ -38,6 +40,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+
 import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Set;
@@ -45,6 +48,9 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests to verify JMX reporter functionality on the JobManager.
+ */
 public class JMXJobManagerMetricTest {
 	/**
 	 * Tests that metrics registered on the JobManager are actually accessible via JMX.
@@ -102,6 +108,9 @@ public class JMXJobManagerMetricTest {
 		}
 	}
 
+	/**
+	 * Utility to block/unblock a task.
+	 */
 	public static class BlockingInvokable extends AbstractInvokable {
 		private static boolean blocking = true;
 		private static final Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 113107f..527f9c1 100644
--- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -42,20 +42,19 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 
 /**
- * Largely based on the StatsDReporter class by ReadyTalk
- * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ * Largely based on the StatsDReporter class by ReadyTalk.
  *
- * Ported since it was not present in maven central.
+ * <p>https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ *
+ * <p>Ported since it was not present in maven central.
  */
 @PublicEvolving
 public class StatsDReporter extends AbstractReporter implements Scheduled {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class);
 
 	public static final String ARG_HOST = "host";
 	public static final String ARG_PORT = "port";
-//	public static final String ARG_CONVERSION_RATE = "rateConversion";
-//	public static final String ARG_CONVERSION_DURATION = "durationConversion";
 
 	private boolean closed = false;
 
@@ -73,11 +72,6 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 
 		this.address = new InetSocketAddress(host, port);
 
-//		String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
-//		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
-//		this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
-//		this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1);
-
 		try {
 			this.socket = new DatagramSocket(0);
 		} catch (SocketException e) {
@@ -131,7 +125,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	private void reportCounter(final String name, final Counter counter) {
 		send(name, String.valueOf(counter.getCount()));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 9215c3f..7d063e7 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -25,10 +25,10 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
-import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -53,6 +54,9 @@ import java.util.concurrent.TimeoutException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the StatsDReporter.
+ */
 public class StatsDReporterTest extends TestLogger {
 
 	@Test
@@ -124,7 +128,7 @@ public class StatsDReporterTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that histograms are properly reported via the StatsD reporter
+	 * Tests that histograms are properly reported via the StatsD reporter.
 	 */
 	@Test
 	public void testStatsDHistogramReporting() throws Exception {
@@ -198,7 +202,7 @@ public class StatsDReporterTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that meters are properly reported via the StatsD reporter
+	 * Tests that meters are properly reported via the StatsD reporter.
 	 */
 	@Test
 	public void testStatsDMetersReporting() throws Exception {
@@ -241,7 +245,6 @@ public class StatsDReporterTest extends TestLogger {
 
 			Set<String> lines = receiver.getLines();
 
-
 			assertEquals(expectedLines, lines);
 
 		} finally {
@@ -260,7 +263,7 @@ public class StatsDReporterTest extends TestLogger {
 	}
 
 	/**
-	 * Testing StatsDReporter which disables the socket creation
+	 * Testing StatsDReporter which disables the socket creation.
 	 */
 	public static class TestingStatsDReporter extends StatsDReporter {
 		@Override
@@ -273,7 +276,7 @@ public class StatsDReporterTest extends TestLogger {
 		}
 	}
 
-	public static class TestingHistogram implements Histogram {
+	private static class TestingHistogram implements Histogram {
 
 		@Override
 		public void update(long value) {
@@ -326,7 +329,7 @@ public class StatsDReporterTest extends TestLogger {
 		}
 	}
 
-	public static class DatagramSocketReceiver implements Runnable {
+	private static class DatagramSocketReceiver implements Runnable {
 		private static final Object obj = new Object();
 
 		private final DatagramSocket socket;

http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index e1b66c2..3be21aa 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -60,4 +60,43 @@ under the License.
 		</dependency>
 	</dependencies>
 
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+					Execute checkstyle after compilation but before tests.
+
+					This ensures that any parsing or type checking errors are from
+					javac, so they look as expected. Beyond that, we want to
+					fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
 </project>


[6/8] flink git commit: [FLINK-6659] fix RocksDBMergeIteratorTest leaving temporary data behind

Posted by ch...@apache.org.
[FLINK-6659] fix RocksDBMergeIteratorTest leaving temporary data behind

-> use a JUnit '@Rule' that does the cleanup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b58a420e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b58a420e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b58a420e

Branch: refs/heads/master
Commit: b58a420e3954f734192a7eff2e187349c4ed9959
Parents: dbcc456
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon May 22 16:47:35 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBMergeIteratorTest.java          | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b58a420e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 956ef2f..f5bcf86 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -21,16 +21,16 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksIterator;
 
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -43,6 +43,9 @@ public class RocksDBMergeIteratorTest {
 	private static final int NUM_KEY_VAL_STATES = 50;
 	private static final int MAX_NUM_KEYS = 20;
 
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
 	@Test
 	public void testEmptyMergeIterator() throws IOException {
 		RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
@@ -51,19 +54,23 @@ public class RocksDBMergeIteratorTest {
 	}
 
 	@Test
-	public void testMergeIterator() throws Exception {
+	public void testMergeIteratorByte() throws Exception {
 		Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
 
 		testMergeIterator(Byte.MAX_VALUE);
+	}
+
+	@Test
+	public void testMergeIteratorShort() throws Exception {
+		Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
+
 		testMergeIterator(Short.MAX_VALUE);
 	}
 
 	public void testMergeIterator(int maxParallelism) throws Exception {
 		Random random = new Random(1234);
 
-		File tmpDir = CommonTestUtils.createTempDirectory();
-
-		RocksDB rocksDB = RocksDB.open(tmpDir.getAbsolutePath());
+		RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath());
 		try {
 			List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
 			List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();


[4/8] flink git commit: [FLINK-6675] Activate strict checkstyle for flink-annotations

Posted by ch...@apache.org.
[FLINK-6675] Activate strict checkstyle for flink-annotations

This closes #3970.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbcc456a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbcc456a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbcc456a

Branch: refs/heads/master
Commit: dbcc456a652e980323b1b23692578e3c22e25e68
Parents: 04fae53
Author: zentol <ch...@apache.org>
Authored: Tue May 23 13:47:53 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200

----------------------------------------------------------------------
 flink-annotations/pom.xml                       | 38 ++++++++++++++++++++
 .../org/apache/flink/annotation/Internal.java   |  1 +
 .../org/apache/flink/annotation/Public.java     |  1 +
 .../apache/flink/annotation/PublicEvolving.java |  1 +
 .../flink/annotation/VisibleForTesting.java     |  3 +-
 5 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/pom.xml
----------------------------------------------------------------------
diff --git a/flink-annotations/pom.xml b/flink-annotations/pom.xml
index b375611..08e10f5 100644
--- a/flink-annotations/pom.xml
+++ b/flink-annotations/pom.xml
@@ -34,4 +34,42 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+					Execute checkstyle after compilation but before tests.
+					This ensures that any parsing or type checking errors are from
+					javac, so they look as expected. Beyond that, we want to
+					fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java b/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
index dd9f080..65231c2 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  *
  */
+
 package org.apache.flink.annotation;
 
 import java.lang.annotation.Documented;

http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java b/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
index d891a0a..1e3bd09 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  *
  */
+
 package org.apache.flink.annotation;
 
 import java.lang.annotation.Documented;

http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java b/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
index 0c093a2..3389e71 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  *
  */
+
 package org.apache.flink.annotation;
 
 import java.lang.annotation.Documented;

http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
index 8f945a9..bed7db0 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  *
  */
+
 package org.apache.flink.annotation;
 
 import java.lang.annotation.Documented;
@@ -25,7 +26,7 @@ import java.lang.annotation.Target;
 /**
  * This annotations declares that a function, field, constructor, or entire type, is only visible for
  * testing purposes.
- * 
+ *
  * <p>This annotation is typically attached when for example a method should be {@code private}
  * (because it is not intended to be called externally), but cannot be declared private, because
  * some tests need to have access to it.