You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/02 13:14:19 UTC

[1/6] flink git commit: [hotfix] Update DataSet docs concerning JDBCInputFormat.

Repository: flink
Updated Branches:
  refs/heads/master cd5b4a630 -> 94ade3de9


[hotfix] Update DataSet docs concerning JDBCInputFormat.

The way to provide type information was outdated.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94ade3de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94ade3de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94ade3de

Branch: refs/heads/master
Commit: 94ade3de9f7345820e869940ced40bce3e9fae38
Parents: 5605107
Author: Maximilian Bode <ma...@tngtech.com>
Authored: Wed May 31 18:48:51 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200

----------------------------------------------------------------------
 docs/dev/batch/index.md | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94ade3de/docs/dev/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index 6fccc04..9a8ce22 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -890,14 +890,12 @@ DataSet<Long> numbers = env.generateSequence(1, 10000000);
 // Read data from a relational database using the JDBC input format
 DataSet<Tuple2<String, Integer> dbData =
     env.createInput(
-      // create and configure input format
       JDBCInputFormat.buildJDBCInputFormat()
                      .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                      .setDBUrl("jdbc:derby:memory:persons")
                      .setQuery("select name, age from persons")
-                     .finish(),
-      // specify type information for DataSet
-      new TupleTypeInfo(Tuple2.class, STRING_TYPE_INFO, INT_TYPE_INFO)
+                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+                     .finish()
     );
 
 // Note: Flink's program compiler needs to infer the data types of the data items which are returned


[3/6] flink git commit: [FLINK-6793] Activate checkstyle for runtime/metrics

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 11f3c0f..bd85303 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -39,6 +39,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link TaskManagerMetricGroup}.
+ */
 public class TaskManagerGroupTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
@@ -74,24 +77,24 @@ public class TaskManagerGroupTest extends TestLogger {
 			jid1, jobName1, vertex12, execution12, "test", 13, 1);
 		TaskMetricGroup tmGroup21 = group.addTaskForJob(
 			jid2, jobName2, vertex21, execution21, "test", 7, 2);
-		
+
 		assertEquals(2, group.numRegisteredJobMetricGroups());
 		assertFalse(tmGroup11.parent().isClosed());
 		assertFalse(tmGroup12.parent().isClosed());
 		assertFalse(tmGroup21.parent().isClosed());
-		
+
 		// close all for job 2 and one from job 1
 		tmGroup11.close();
 		tmGroup21.close();
 		assertTrue(tmGroup11.isClosed());
 		assertTrue(tmGroup21.isClosed());
-		
+
 		// job 2 should be removed, job should still be there
 		assertFalse(tmGroup11.parent().isClosed());
 		assertFalse(tmGroup12.parent().isClosed());
 		assertTrue(tmGroup21.parent().isClosed());
 		assertEquals(1, group.numRegisteredJobMetricGroups());
-		
+
 		// add one more to job one
 		TaskMetricGroup tmGroup13 = group.addTaskForJob(
 			jid1, jobName1, vertex13, execution13, "test", 0, 0);
@@ -101,7 +104,7 @@ public class TaskManagerGroupTest extends TestLogger {
 		assertTrue(tmGroup11.parent().isClosed());
 		assertTrue(tmGroup12.parent().isClosed());
 		assertTrue(tmGroup13.parent().isClosed());
-		
+
 		assertEquals(0, group.numRegisteredJobMetricGroups());
 
 		registry.shutdown();
@@ -111,8 +114,7 @@ public class TaskManagerGroupTest extends TestLogger {
 	public void testCloseClosesAll() throws IOException {
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
-				registry, "localhost", new AbstractID().toString());
-
+			registry, "localhost", new AbstractID().toString());
 
 		final JobID jid1 = new JobID();
 		final JobID jid2 = new JobID();
@@ -136,14 +138,14 @@ public class TaskManagerGroupTest extends TestLogger {
 			jid2, jobName2, vertex21, execution21, "test", 7, 1);
 
 		group.close();
-		
+
 		assertTrue(tmGroup11.isClosed());
 		assertTrue(tmGroup12.isClosed());
 		assertTrue(tmGroup21.isClosed());
 
 		registry.shutdown();
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  scope name tests
 	// ------------------------------------------------------------------------
@@ -153,7 +155,7 @@ public class TaskManagerGroupTest extends TestLogger {
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id");
 
-		assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents());
+		assertArrayEquals(new String[]{"localhost", "taskmanager", "id"}, group.getScopeComponents());
 		assertEquals("localhost.taskmanager.id.name", group.getMetricIdentifier("name"));
 		registry.shutdown();
 	}
@@ -165,7 +167,7 @@ public class TaskManagerGroupTest extends TestLogger {
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
 
-		assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents());
+		assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
 		assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name"));
 		registry.shutdown();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index f9891cb..fc0ce5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -22,16 +22,19 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link TaskManagerJobMetricGroup}.
+ */
 public class TaskManagerJobGroupTest extends TestLogger {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 183237a..22b0a1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -28,12 +28,16 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link TaskMetricGroup}.
+ */
 public class TaskMetricGroupTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
@@ -51,7 +55,7 @@ public class TaskMetricGroupTest extends TestLogger {
 		TaskMetricGroup taskGroup = new TaskMetricGroup(registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2);
 
 		assertArrayEquals(
-				new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "aTaskName", "13"},
+				new String[]{"theHostName", "taskmanager", "test-tm-id", "myJobName", "aTaskName", "13"},
 				taskGroup.getScopeComponents());
 
 		assertEquals(
@@ -78,7 +82,7 @@ public class TaskMetricGroupTest extends TestLogger {
 				registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2);
 
 		assertArrayEquals(
-				new String[] { "test-tm-id", jid.toString(), vertexId.toString(), executionId.toString() },
+				new String[]{"test-tm-id", jid.toString(), vertexId.toString(), executionId.toString()},
 				taskGroup.getScopeComponents());
 
 		assertEquals(
@@ -102,7 +106,7 @@ public class TaskMetricGroupTest extends TestLogger {
 				registry, jmGroup, new AbstractID(), executionId, "aTaskName", 13, 1);
 
 		assertArrayEquals(
-				new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13" },
+				new String[]{"theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13"},
 				taskGroup.getScopeComponents());
 
 		assertEquals(

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
index 601f734..15bf718 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
@@ -15,10 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.util;
 
 import org.apache.flink.metrics.CharacterFilter;
 
+/**
+ * A {@link CharacterFilter} that returns the given string without any modification.
+ */
 public class DummyCharacterFilter implements CharacterFilter {
 	@Override
 	public String filterCharacters(String input) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
index cd96c60..e5e09d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
@@ -23,6 +23,9 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.AbstractReporter;
 
+/**
+ * No-op reporter implementation.
+ */
 public class TestReporter extends AbstractReporter {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
index 82f8504..8565ed3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
@@ -15,11 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.util;
 
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 
+/**
+ * Stateless test histogram for which all methods return a static value.
+ */
 public class TestingHistogram implements Histogram {
 
 	@Override


[2/6] flink git commit: [FLINK-6795] Activate checkstyle for runtime/process

Posted by ch...@apache.org.
[FLINK-6795] Activate checkstyle for runtime/process

This closes #4040.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c041dd81
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c041dd81
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c041dd81

Branch: refs/heads/master
Commit: c041dd81f66b5f53ae8e1911016949eb4c3ff530
Parents: 00a2666
Author: zentol <ch...@apache.org>
Authored: Thu Jun 1 12:25:42 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                                              | 1 -
 .../main/java/org/apache/flink/runtime/process/ProcessReaper.java  | 2 +-
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c041dd81/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 0f8e2e9..38ab32b 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -450,7 +450,6 @@ under the License.
 						**/runtime/net/**,
 						**/runtime/operators/**,
 						**/runtime/plugable/**,
-						**/runtime/process/**,
 						**/runtime/query/**,
 						**/runtime/registration/**,
 						**/runtime/resourcemanager/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/c041dd81/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index 09e1839..77ec3eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -50,7 +50,7 @@ public class ProcessReaper extends UntypedActor {
 				String name = term.actor().path().toSerializationFormat();
 				if (log != null) {
 					log.error("Actor " + name + " terminated, stopping process...");
-					
+
 					// give the log some time to reach disk
 					try {
 						Thread.sleep(100);


[6/6] flink git commit: [FLINK-6794] Activate checkstyle for migration/**

Posted by ch...@apache.org.
[FLINK-6794] Activate checkstyle for migration/**

This closes #4038.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00a26668
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00a26668
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00a26668

Branch: refs/heads/master
Commit: 00a266680c8e962fb158472821e1d78660385a2e
Parents: 2b3d284
Author: zentol <ch...@apache.org>
Authored: Thu Jun 1 12:33:09 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                                           | 5 -----
 .../src/main/java/org/apache/flink/migration/MigrationUtil.java | 3 +++
 .../migration/streaming/runtime/tasks/StreamTaskState.java      | 5 ++---
 .../migration/streaming/runtime/tasks/StreamTaskStateList.java  | 2 +-
 4 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00a26668/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index d83d671..0f8e2e9 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -420,11 +420,6 @@ under the License.
 					<logViolationsToConsole>true</logViolationsToConsole>
 					<failOnViolation>true</failOnViolation>
 					<excludes>
-						**/migration/api/**,
-						**/migration/runtime/**,
-						**/migration/state/**,
-						**/migration/streaming/**,
-						**/migration/*,
 						**/runtime/akka/**,
 						**/runtime/blob/**,
 						**/runtime/broadcast/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/00a26668/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
index a4e3a2e..a6055a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
@@ -23,6 +23,9 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 
 import java.util.Collection;
 
+/**
+ * Utility functions for migration.
+ */
 public class MigrationUtil {
 
 	@SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/flink/blob/00a26668/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
index 7a2aab9..b044ffb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
@@ -36,7 +36,7 @@ import java.util.HashMap;
 public class StreamTaskState implements Serializable, Closeable {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private StateHandle<?> operatorState;
 
 	private StateHandle<Serializable> functionState;
@@ -74,14 +74,13 @@ public class StreamTaskState implements Serializable, Closeable {
 	/**
 	 * Checks if this state object actually contains any state, or if all of the state
 	 * fields are null.
-	 * 
+	 *
 	 * @return True, if all state is null, false if at least one state is not null.
 	 */
 	public boolean isEmpty() {
 		return operatorState == null & functionState == null & kvStates == null;
 	}
 
-
 	@Override
 	public void close() throws IOException {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00a26668/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
index c2357f7..7643039 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
@@ -35,7 +35,7 @@ public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
 
 	private static final long serialVersionUID = 1L;
 
-	/** The states for all operator */
+	/** The states for all operator. */
 	private final StreamTaskState[] states;
 
 	public StreamTaskStateList(StreamTaskState[] states) throws Exception {


[4/6] flink git commit: [FLINK-6793] Activate checkstyle for runtime/metrics

Posted by ch...@apache.org.
[FLINK-6793] Activate checkstyle for runtime/metrics

This closes #4037.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b3d284b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b3d284b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b3d284b

Branch: refs/heads/master
Commit: 2b3d284bf09a30edafa4dadf50492156bb47027b
Parents: cd5b4a6
Author: zentol <ch...@apache.org>
Authored: Wed May 31 16:39:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../flink/runtime/metrics/MetricNames.java      |  4 ++
 .../flink/runtime/metrics/MetricRegistry.java   | 22 ++++---
 .../metrics/MetricRegistryConfiguration.java    | 11 ++--
 .../flink/runtime/metrics/ViewUpdater.java      |  1 +
 .../flink/runtime/metrics/dump/MetricDump.java  |  4 +-
 .../metrics/dump/MetricDumpSerialization.java   | 31 ++++++----
 .../metrics/dump/MetricQueryService.java        | 16 ++---
 .../runtime/metrics/dump/QueryScopeInfo.java    |  5 +-
 .../metrics/groups/AbstractMetricGroup.java     | 52 ++++++++--------
 .../metrics/groups/ComponentMetricGroup.java    |  8 +--
 .../metrics/groups/GenericMetricGroup.java      |  2 +-
 .../groups/JobManagerJobMetricGroup.java        |  2 +
 .../metrics/groups/JobManagerMetricGroup.java   |  1 +
 .../runtime/metrics/groups/JobMetricGroup.java  |  9 +--
 .../metrics/groups/OperatorIOMetricGroup.java   |  3 +-
 .../metrics/groups/OperatorMetricGroup.java     |  4 +-
 .../metrics/groups/TaskIOMetricGroup.java       |  4 +-
 .../groups/TaskManagerJobMetricGroup.java       |  4 +-
 .../metrics/groups/TaskManagerMetricGroup.java  |  1 -
 .../runtime/metrics/groups/TaskMetricGroup.java |  9 +--
 .../runtime/metrics/scope/ScopeFormat.java      | 18 +++---
 .../runtime/metrics/scope/ScopeFormats.java     | 10 ++-
 .../flink/runtime/metrics/util/MetricUtils.java |  7 ++-
 .../runtime/metrics/MetricRegistryTest.java     | 65 +++++++++++++++-----
 .../runtime/metrics/TaskManagerMetricsTest.java | 23 ++++---
 .../metrics/dump/MetricDumpSerializerTest.java  | 22 ++++---
 .../runtime/metrics/dump/MetricDumpTest.java    |  4 ++
 .../metrics/dump/MetricQueryServiceTest.java    | 15 +++--
 .../metrics/dump/QueryScopeInfoTest.java        |  4 ++
 .../metrics/groups/AbstractMetricGroupTest.java | 16 ++++-
 .../metrics/groups/JobManagerGroupTest.java     |  5 ++
 .../metrics/groups/JobManagerJobGroupTest.java  |  4 ++
 .../groups/MetricGroupRegistrationTest.java     | 13 +++-
 .../runtime/metrics/groups/MetricGroupTest.java | 38 +++++++-----
 .../metrics/groups/OperatorGroupTest.java       |  6 +-
 .../metrics/groups/QueryScopeInfoTest.java      |  9 ++-
 .../metrics/groups/TaskIOMetricGroupTest.java   |  7 ++-
 .../metrics/groups/TaskManagerGroupTest.java    | 24 ++++----
 .../metrics/groups/TaskManagerJobGroupTest.java |  5 +-
 .../metrics/groups/TaskMetricGroupTest.java     | 10 ++-
 .../metrics/util/DummyCharacterFilter.java      |  4 ++
 .../runtime/metrics/util/TestReporter.java      |  3 +
 .../runtime/metrics/util/TestingHistogram.java  |  4 ++
 44 files changed, 331 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 2272bc7..d83d671 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -451,7 +451,6 @@ under the License.
 						**/runtime/leaderretrieval/**,
 						**/runtime/memory/**,
 						**/runtime/messages/**,
-						**/runtime/metrics/**,
 						**/runtime/minicluster/**,
 						**/runtime/net/**,
 						**/runtime/operators/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 9202ca1..300b4b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -15,8 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics;
 
+/**
+ * Collection of metric names.
+ */
 public class MetricNames {
 	private MetricNames() {
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 9f46d47..5018cbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.metrics;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.pattern.Patterns;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -38,11 +34,13 @@ import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -53,6 +51,10 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 /**
  * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
  * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
@@ -154,7 +156,7 @@ public class MetricRegistry {
 
 	/**
 	 * Initializes the MetricQueryService.
-	 * 
+	 *
 	 * @param actorSystem ActorSystem to create the MetricQueryService on
 	 * @param resourceID resource ID used to disambiguate the actor name
      */
@@ -250,7 +252,7 @@ public class MetricRegistry {
 			}
 		}
 	}
-	
+
 	private void shutdownExecutor() {
 		if (executor != null) {
 			executor.shutdown();
@@ -361,7 +363,7 @@ public class MetricRegistry {
 	 * This task is explicitly a static class, so that it does not hold any references to the enclosing
 	 * MetricsRegistry instance.
 	 *
-	 * This is a subtle difference, but very important: With this static class, the enclosing class instance
+	 * <p>This is a subtle difference, but very important: With this static class, the enclosing class instance
 	 * may become garbage-collectible, whereas with an anonymous inner class, the timer thread
 	 * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.
 	 * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible,

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 3475f04..6f6db86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,7 @@ public class MetricRegistryConfiguration {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MetricRegistryConfiguration.class);
 
-	private static volatile MetricRegistryConfiguration DEFAULT_CONFIGURATION;
+	private static volatile MetricRegistryConfiguration defaultConfiguration;
 
 	// regex pattern to split the defined reporters
 	private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*");
@@ -148,15 +149,15 @@ public class MetricRegistryConfiguration {
 
 	public static MetricRegistryConfiguration defaultMetricRegistryConfiguration() {
 		// create the default metric registry configuration only once
-		if (DEFAULT_CONFIGURATION == null) {
+		if (defaultConfiguration == null) {
 			synchronized (MetricRegistryConfiguration.class) {
-				if (DEFAULT_CONFIGURATION == null) {
-					DEFAULT_CONFIGURATION = fromConfiguration(new Configuration());
+				if (defaultConfiguration == null) {
+					defaultConfiguration = fromConfiguration(new Configuration());
 				}
 			}
 		}
 
-		return DEFAULT_CONFIGURATION;
+		return defaultConfiguration;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
index e4d0596..77bd0d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.metrics.View;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
index 2239b50..202e453 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.apache.flink.util.Preconditions;
@@ -121,7 +122,7 @@ public abstract class MetricDump {
 	}
 
 	/**
-	 * Container for the rate of a {@link org.apache.flink.metrics.Meter}. 
+	 * Container for the rate of a {@link org.apache.flink.metrics.Meter}.
 	 */
 	public static class MeterDump extends MetricDump {
 		public final double rate;
@@ -130,6 +131,7 @@ public abstract class MetricDump {
 			super(scopeInfo, name);
 			this.rate = rate;
 		}
+
 		@Override
 		public byte getCategory() {
 			return METRIC_CATEGORY_METER;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index e57a0d8..e173522 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -27,6 +28,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,14 +58,14 @@ public class MetricDumpSerialization {
 
 	/**
 	 * This class encapsulates all serialized metrics and a count for each metric type.
-	 * 
-	 * The counts are stored separately from the metrics since the final count for any given type can only be
+	 *
+	 * <p>The counts are stored separately from the metrics since the final count for any given type can only be
 	 * determined after all metrics of that type were serialized. Storing them together in a single byte[] would
 	 * require an additional copy of all serialized metrics, as you would first have to serialize the metrics into a
 	 * temporary buffer to calculate the counts, write the counts to the final output and copy all metrics from the
 	 * temporary buffer.
-	 * 
-	 * Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag)
+	 *
+	 * <p>Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag)
 	 * is written for each metric, this would require more bandwidth due to the sheer number of metrics.
 	 */
 	public static class MetricSerializationResult implements Serializable {
@@ -75,12 +77,12 @@ public class MetricDumpSerialization {
 		public final int numGauges;
 		public final int numMeters;
 		public final int numHistograms;
-		
+
 		public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) {
 			Preconditions.checkNotNull(serializedMetrics);
 			Preconditions.checkArgument(numCounters >= 0);
 			Preconditions.checkArgument(numGauges >= 0);
-			Preconditions.checkArgument(numMeters >= 0); 
+			Preconditions.checkArgument(numMeters >= 0);
 			Preconditions.checkArgument(numHistograms >= 0);
 			this.serializedMetrics = serializedMetrics;
 			this.numCounters = numCounters;
@@ -94,18 +96,21 @@ public class MetricDumpSerialization {
 	// Serialization
 	//-------------------------------------------------------------------------
 
+	/**
+	 * Serializes a set of metrics into a {@link MetricSerializationResult}.
+	 */
 	public static class MetricDumpSerializer {
 
 		private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
 
 		/**
 		 * Serializes the given metrics and returns the resulting byte array.
-		 * 
-		 * Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned
+		 *
+		 * <p>Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned
 		 * {@link MetricSerializationResult}.
-		 * 
-		 * If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult}
-		 * is partially corrupted. Such a result can be deserialized safely by 
+		 *
+		 * <p>If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult}
+		 * is partially corrupted. Such a result can be deserialized safely by
 		 * {@link MetricDumpDeserializer#deserialize(MetricSerializationResult)}; however only metrics that were
 		 * fully serialized before the failure will be returned.
 		 *
@@ -263,6 +268,9 @@ public class MetricDumpSerialization {
 	// Deserialization
 	//-------------------------------------------------------------------------
 
+	/**
+	 * Deserializer for reading a list of {@link MetricDump MetricDumps} from a {@link MetricSerializationResult}.
+	 */
 	public static class MetricDumpDeserializer {
 		/**
 		 * De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}.
@@ -311,7 +319,6 @@ public class MetricDumpSerialization {
 		}
 	}
 
-
 	private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException {
 		QueryScopeInfo scope = deserializeMetricInfo(dis);
 		String name = dis.readUTF();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 2229139..8821e0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -15,13 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
@@ -31,6 +27,12 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +46,7 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
 /**
  * The MetricQueryService creates a key-value representation of all metrics currently registered with Flink when queried.
  *
- * It is realized as an actor and can be notified of
+ * <p>It is realized as an actor and can be notified of
  * - an added metric by calling {@link MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, AbstractMetricGroup)}
  * - a removed metric by calling {@link MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)}
  * - a metric dump request by sending the return value of {@link MetricQueryService#getCreateDump()}
@@ -217,6 +219,6 @@ public class MetricQueryService extends UntypedActor {
 	}
 
 	private static class CreateDump implements Serializable {
-		private static CreateDump INSTANCE = new CreateDump();
+		private static final CreateDump INSTANCE = new CreateDump();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index 6572ca0..9af9d78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 /**
@@ -28,7 +29,7 @@ public abstract class QueryScopeInfo {
 	public static final byte INFO_CATEGORY_TASK = 3;
 	public static final byte INFO_CATEGORY_OPERATOR = 4;
 
-	/** The remaining scope not covered by specific fields */
+	/** The remaining scope not covered by specific fields. */
 	public final String scope;
 
 	private QueryScopeInfo(String scope) {
@@ -45,7 +46,7 @@ public abstract class QueryScopeInfo {
 
 	/**
 	 * Returns the category for this QueryScopeInfo.
-	 * 
+	 *
 	 * @return category
      */
 	public abstract byte getCategory();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index a19970d..c67c5ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,9 +42,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Abstract {@link MetricGroup} that contains key functionality for adding metrics and groups.
- * 
+ *
  * <p><b>IMPORTANT IMPLEMENTATION NOTE</b>
- * 
+ *
  * <p>This class uses locks for adding and removing metrics objects. This is done to
  * prevent resource leaks in the presence of concurrently closing a group and adding
  * metrics and subgroups.
@@ -56,30 +57,29 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * from any metrics reporter and any internal maps. Note that even closed metrics groups
  * return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
  * These metrics simply do not get reported any more, when created on a closed group.
- * 
+ *
  * @param <A> The type of the parent MetricGroup
  */
 @Internal
 public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> implements MetricGroup {
 
-	/** shared logger */
 	private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
 
 	// ------------------------------------------------------------------------
 
-	/** The parent group containing this group */
+	/** The parent group containing this group. */
 	protected final A parent;
 
 	/** The map containing all variables and their associated values, lazily computed. */
 	protected volatile Map<String, String> variables;
-	
-	/** The registry that this metrics group belongs to */
+
+	/** The registry that this metrics group belongs to. */
 	protected final MetricRegistry registry;
 
-	/** All metrics that are directly contained in this group */
+	/** All metrics that are directly contained in this group. */
 	private final Map<String, Metric> metrics = new HashMap<>();
 
-	/** All metric subgroups of this group */
+	/** All metric subgroups of this group. */
 	private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
 
 	/** The metrics scope represented by this group.
@@ -97,7 +97,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	/** The metrics query service scope represented by this group, lazily computed. */
 	protected QueryScopeInfo queryServiceScopeInfo;
 
-	/** Flag indicating whether this group has been closed */
+	/** Flag indicating whether this group has been closed. */
 	private volatile boolean closed;
 
 	// ------------------------------------------------------------------------
@@ -111,7 +111,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	public Map<String, String> getAllVariables() {
 		if (variables == null) { // avoid synchronization for common case
-			synchronized(this) {
+			synchronized (this) {
 				if (variables == null) {
 					if (parent != null) {
 						variables = parent.getAllVariables();
@@ -126,7 +126,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	/**
 	 * Returns the logical scope of this group, for example
-	 * {@code "taskmanager.job.task"}
+	 * {@code "taskmanager.job.task"}.
 	 *
 	 * @param filter character filter which is applied to the scope components
 	 * @return logical scope
@@ -137,7 +137,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	/**
 	 * Returns the logical scope of this group, for example
-	 * {@code "taskmanager.job.task"}
+	 * {@code "taskmanager.job.task"}.
 	 *
 	 * @param filter character filter which is applied to the scope components
 	 * @return logical scope
@@ -155,7 +155,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	/**
 	 * Returns the name for this group, meaning what kind of entity it represents, for example "taskmanager".
-	 * 
+	 *
 	 * @param filter character filter which is applied to the name
 	 * @return logical name for this group
 	 */
@@ -163,9 +163,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	/**
 	 * Gets the scope as an array of the scope components, for example
-	 * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
-	 * 
-	 * @see #getMetricIdentifier(String) 
+	 * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}.
+	 *
+	 * @see #getMetricIdentifier(String)
 	 */
 	public String[] getScopeComponents() {
 		return scopeComponents;
@@ -173,7 +173,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	/**
 	 * Returns the metric query service scope for this group.
-	 * 
+	 *
 	 * @param filter character filter
 	 * @return query service scope
      */
@@ -194,8 +194,8 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	/**
 	 * Returns the fully qualified metric name, for example
-	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
-	 * 
+	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
+	 *
 	 * @param metricName metric name
 	 * @return fully qualified metric name
 	 */
@@ -205,7 +205,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	/**
 	 * Returns the fully qualified metric name, for example
-	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
 	 *
 	 * @param metricName metric name
 	 * @param filter character filter which is applied to the scope components if not null.
@@ -217,7 +217,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	/**
 	 * Returns the fully qualified metric name using the configured delimiter for the reporter with the given index, for example
-	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
 	 *
 	 * @param metricName metric name
 	 * @param filter character filter which is applied to the scope components if not null.
@@ -250,7 +250,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 			return scopeStrings[reporterIndex] + delimiter + metricName;
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Closing
 	// ------------------------------------------------------------------------
@@ -292,7 +292,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	public Counter counter(String name) {
 		return counter(name, new SimpleCounter());
 	}
-	
+
 	@Override
 	public <C extends Counter> C counter(int name, C counter) {
 		return counter(String.valueOf(name), counter);
@@ -340,7 +340,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	/**
 	 * Adds the given metric to the group and registers it at the registry, if the group
 	 * is not yet closed, and if no metric with the same name has been registered before.
-	 * 
+	 *
 	 * @param name the name to register the metric under
 	 * @param metric the metric to register
 	 */
@@ -372,7 +372,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 				else {
 					// we had a collision. put back the original value
 					metrics.put(name, prior);
-					
+
 					// we warn here, rather than failing, because metrics are tools that should not fail the
 					// program when used incorrectly
 					LOG.warn("Name collision: Group already contains a Metric with the name '" +

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
index 9f0f483..0cd9942 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
@@ -25,9 +25,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g., 
+ * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g.,
  * TaskManager, Job, Task, Operator).
- * 
+ *
  * <p>Usually, the scope of metrics is simply the hierarchy of the containing groups. For example
  * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code "A"} would have a
  * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the Metric's scope.
@@ -36,7 +36,7 @@ import java.util.Map;
  * certain identifiers from the scope. The scope for metrics belonging to the "Task"
  * group could for example include the task attempt number (more fine grained identification), or
  * exclude it (for continuity of the namespace across failure and recovery).
- * 
+ *
  * @param <P> The type of the parent MetricGroup.
  */
 @Internal
@@ -101,7 +101,7 @@ public abstract class ComponentMetricGroup<P extends AbstractMetricGroup<?>> ext
 
 	/**
 	 * Gets all component metric groups that are contained in this component metric group.
-	 * 
+	 *
 	 * @return All component metric groups that are contained in this component metric group.
 	 */
 	protected abstract Iterable<? extends ComponentMetricGroup> subComponents();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
index 5978f2d..ee7d1ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
  */
 @Internal
 public class GenericMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<?>> {
-	/** The name of this group */
+	/** The name of this group. */
 	private String name;
 
 	public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
index c4902e8..b62c7b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
@@ -22,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
 import javax.annotation.Nullable;
+
 import java.util.Collections;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index 5a35110..e09051d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
index 17f6189..876e794 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
@@ -26,21 +26,22 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import javax.annotation.Nullable;
+
 import java.util.Map;
 
 /**
  * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
  * a specific job.
- * 
+ *
  * @param <C> The type of the parent ComponentMetricGroup.
  */
 @Internal
 public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends ComponentMetricGroup<C> {
 
-	/** The ID of the job represented by this metrics group */
+	/** The ID of the job represented by this metrics group. */
 	protected final JobID jobId;
 
-	/** The name of the job represented by this metrics group */
+	/** The name of the job represented by this metrics group. */
 	@Nullable
 	protected final String jobName;
 
@@ -53,7 +54,7 @@ public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends
 			@Nullable String jobName,
 			String[] scope) {
 		super(registry, scope, parent);
-		
+
 		this.jobId = jobId;
 		this.jobName = jobName;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 5bf7d1f..6bf3c6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
@@ -64,7 +65,7 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
 	public void reuseInputMetricsForTask() {
 		TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup();
 		taskIO.reuseRecordsInputCounter(this.numRecordsIn);
-		
+
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index 37c9dd8..2313873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -46,7 +46,7 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	public final TaskMetricGroup parent() {
 		return parent;
 	}
@@ -68,7 +68,7 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
 	public OperatorIOMetricGroup getIOMetricGroup() {
 		return ioMetrics;
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Component Metric Group Specifics
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 38accad..e12ecd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -24,11 +24,11 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -110,7 +110,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	// ============================================================================================
 
 	/**
-	 * Initialize Buffer Metrics for a task
+	 * Initialize Buffer Metrics for a task.
 	 */
 	public void initializeBufferMetrics(Task task) {
 		final MetricGroup buffers = addGroup("buffers");

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
index 79a87d0..3921553 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
@@ -25,6 +26,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricGroup> {
 
-	/** Map from execution attempt ID (task identifier) to task metrics */
+	/** Map from execution attempt ID (task identifier) to task metrics. */
 	private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index 92c509a..d85e868 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -46,7 +46,6 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr
 
 	private final String taskManagerId;
 
-
 	public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) {
 		super(registry, registry.getScopeFormats().getTaskManagerFormat().formatScope(hostname, taskManagerId), null);
 		this.hostname = hostname;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 43e8e1b..cb7aaa0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -33,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink runtime Task.
- * 
+ *
  * <p>Contains extra logic for adding operators.
  */
 @Internal
@@ -42,13 +43,13 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
 	private final TaskIOMetricGroup ioMetrics;
-	
-	/** The execution Id uniquely identifying the executed task represented by this metrics group */
+
+	/** The execution Id uniquely identifying the executed task represented by this metrics group. */
 	private final AbstractID executionId;
 
 	@Nullable
 	protected final AbstractID vertexId;
-	
+
 	@Nullable
 	private final String taskName;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index f9efb88..18b3a0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -52,7 +52,7 @@ public abstract class ScopeFormat {
 	/**
 	 * If the scope format starts with this character, then the parent components scope
 	 * format will be used as a prefix.
-	 * 
+	 *
 	 * <p>For example, if the TaskManager's job format is {@code "*.<job_name>"}, and the
 	 * TaskManager format is {@code "<host>"}, then the job's metrics
 	 * will have {@code "<host>.<job_name>"} as their scope.
@@ -90,16 +90,16 @@ public abstract class ScopeFormat {
 	// ----- Operator ----
 
 	public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name");
-	
+
 
 	// ------------------------------------------------------------------------
 	//  Scope Format Base
 	// ------------------------------------------------------------------------
 
-	/** The scope format */
+	/** The scope format. */
 	private final String format;
 
-	/** The format, split into components */
+	/** The format, split into components. */
 	private final String[] template;
 
 	private final int[] templatePos;
@@ -125,7 +125,7 @@ public abstract class ScopeFormat {
 
 			String[] parentTemplate = parent.template;
 			int parentLen = parentTemplate.length;
-			
+
 			this.template = new String[parentLen + rawComponents.length - 1];
 			System.arraycopy(parentTemplate, 0, this.template, 0, parentLen);
 			System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1);
@@ -137,14 +137,14 @@ public abstract class ScopeFormat {
 
 		// --- compute the replacement matrix ---
 		// a bit of clumsy Java collections code ;-)
-		
+
 		HashMap<String, Integer> varToValuePos = arrayToMap(variables);
 		List<Integer> templatePos = new ArrayList<>();
 		List<Integer> valuePos = new ArrayList<>();
 
 		for (int i = 0; i < template.length; i++) {
 			final String component = template[i];
-			
+
 			// check if that is a variable
 			if (component != null && component.length() >= 3 &&
 					component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') {
@@ -188,7 +188,7 @@ public abstract class ScopeFormat {
 	public String toString() {
 		return "ScopeFormat '" + format + '\'';
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -233,7 +233,7 @@ public abstract class ScopeFormat {
 		}
 		return sb.toString();
 	}
-	
+
 	protected static String valueOrNull(Object value) {
 		return (value == null || (value instanceof String && ((String) value).isEmpty())) ?
 				"null" : value.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
index bde93be..dc49d32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
@@ -67,8 +67,7 @@ public class ScopeFormats {
 			String taskManagerFormat,
 			String taskManagerJobFormat,
 			String taskFormat,
-			String operatorFormat)
-	{
+			String operatorFormat) {
 		this.jobManagerFormat = new JobManagerScopeFormat(jobManagerFormat);
 		this.jobManagerJobFormat = new JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat);
 		this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat);
@@ -86,8 +85,7 @@ public class ScopeFormats {
 			TaskManagerScopeFormat taskManagerFormat,
 			TaskManagerJobScopeFormat taskManagerJobFormat,
 			TaskScopeFormat taskFormat,
-			OperatorScopeFormat operatorFormat)
-	{
+			OperatorScopeFormat operatorFormat) {
 		this.jobManagerFormat = checkNotNull(jobManagerFormat);
 		this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat);
 		this.taskManagerFormat = checkNotNull(taskManagerFormat);
@@ -129,8 +127,8 @@ public class ScopeFormats {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates the scope formats as defined in the given configuration
-	 * 
+	 * Creates the scope formats as defined in the given configuration.
+	 *
 	 * @param config The configuration that defines the formats
 	 * @return The ScopeFormats parsed from the configuration
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 4612eaf..2ecde42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -15,12 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.util;
 
-import org.apache.commons.lang3.text.WordUtils;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+
+import org.apache.commons.lang3.text.WordUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,9 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
 
+/**
+ * Utility class to register pre-defined metric sets.
+ */
 public class MetricUtils {
 	private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class);
 	private static final String METRIC_GROUP_STATUS_NAME = "Status";

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index b9502b2..1de2551 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.metrics;
 
-import akka.actor.ActorNotFound;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
@@ -34,20 +31,27 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.metrics.util.TestReporter;
-
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link MetricRegistry}.
+ */
 public class MetricRegistryTest extends TestLogger {
 
 	private static final char GLOBAL_DEFAULT_DELIMITER = '.';
@@ -55,14 +59,14 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testIsShutdown() {
 		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-		
+
 		Assert.assertFalse(metricRegistry.isShutdown());
-		
+
 		metricRegistry.shutdown();
-		
+
 		Assert.assertTrue(metricRegistry.isShutdown());
 	}
-	
+
 	/**
 	 * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
 	 */
@@ -82,6 +86,9 @@ public class MetricRegistryTest extends TestLogger {
 		metricRegistry.shutdown();
 	}
 
+	/**
+	 * Reporter that exposes whether open() was called.
+	 */
 	protected static class TestReporter1 extends TestReporter {
 		public static boolean wasOpened = false;
 
@@ -114,6 +121,9 @@ public class MetricRegistryTest extends TestLogger {
 		metricRegistry.shutdown();
 	}
 
+	/**
+	 * Reporter that exposes whether open() was called.
+	 */
 	protected static class TestReporter11 extends TestReporter {
 		public static boolean wasOpened = false;
 
@@ -123,6 +133,9 @@ public class MetricRegistryTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Reporter that exposes whether open() was called.
+	 */
 	protected static class TestReporter12 extends TestReporter {
 		public static boolean wasOpened = false;
 
@@ -132,6 +145,9 @@ public class MetricRegistryTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Reporter that exposes whether open() was called.
+	 */
 	protected static class TestReporter13 extends TestReporter {
 		public static boolean wasOpened = false;
 
@@ -156,6 +172,9 @@ public class MetricRegistryTest extends TestLogger {
 		new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
 	}
 
+	/**
+	 * Reporter that verifies whether configured arguments were properly passed.
+	 */
 	protected static class TestReporter2 extends TestReporter {
 		@Override
 		public void open(MetricConfig config) {
@@ -186,9 +205,9 @@ public class MetricRegistryTest extends TestLogger {
 			int reportCount = TestReporter3.reportCount;
 			long curT = System.currentTimeMillis();
 			/**
-			 * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. 
-			 * This value however does not not take the first triggered report into account (=> +1). 
-			 * Furthermore we have to account for the mis-alignment between reports being triggered and our time 
+			 * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports.
+			 * This value however does not not take the first triggered report into account (=> +1).
+			 * Furthermore we have to account for the mis-alignment between reports being triggered and our time
 			 * measurement (=> +1); for T=200 a total of 4-6 reports may have been
 			 * triggered depending on whether the end of the interval for the first reports ends before
 			 * or after T=50.
@@ -201,6 +220,9 @@ public class MetricRegistryTest extends TestLogger {
 		registry.shutdown();
 	}
 
+	/**
+	 * Reporter that exposes how often report() was called.
+	 */
 	protected static class TestReporter3 extends TestReporter implements Scheduled {
 		public static int reportCount = 0;
 
@@ -234,6 +256,9 @@ public class MetricRegistryTest extends TestLogger {
 		registry.shutdown();
 	}
 
+	/**
+	 * Reporter that exposes whether it was notified of added or removed metrics.
+	 */
 	protected static class TestReporter6 extends TestReporter {
 		public static boolean addCalled = false;
 		public static boolean removeCalled = false;
@@ -253,6 +278,9 @@ public class MetricRegistryTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Reporter that exposes whether it was notified of added or removed metrics.
+	 */
 	protected static class TestReporter7 extends TestReporter {
 		public static boolean addCalled = false;
 		public static boolean removeCalled = false;
@@ -344,10 +372,10 @@ public class MetricRegistryTest extends TestLogger {
 
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 		List<MetricReporter> reporters = registry.getReporters();
-		((TestReporter8)reporters.get(0)).expectedDelimiter = '_'; //test1  reporter
-		((TestReporter8)reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
-		((TestReporter8)reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
-		((TestReporter8)reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
+		((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1  reporter
+		((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
+		((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
+		((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
 
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
 		group.counter("C");
@@ -383,6 +411,9 @@ public class MetricRegistryTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier.
+	 */
 	public static class TestReporter8 extends TestReporter {
 		char expectedDelimiter;
 		public static int numCorrectDelimitersForRegister = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index d6fc48c..5e6bbce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -15,12 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.runtime.metrics;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
+package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -34,18 +30,25 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
 import org.junit.Assert;
 import org.junit.Test;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.net.InetAddress;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests for the behavior of the metric system on a task manager.
+ */
 public class TaskManagerMetricsTest extends TestLogger {
 
 	/**
@@ -78,7 +81,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 			final Configuration config = new Configuration();
 			final ResourceID tmResourceID = ResourceID.generate();
 
-			TaskManagerServicesConfiguration taskManagerServicesConfiguration = 
+			TaskManagerServicesConfiguration taskManagerServicesConfiguration =
 					TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), false);
 
 			TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config);
@@ -87,7 +90,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 					taskManagerServicesConfiguration, tmResourceID);
 
 			final MetricRegistry tmRegistry = taskManagerServices.getMetricRegistry();
-			
+
 			// create the task manager
 			final Props tmProps = TaskManager.getTaskManagerProps(
 				TaskManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 6e3d8f4..d36c469 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -24,6 +25,7 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -43,7 +45,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
+/**
+ * Tests for the {@link MetricDumpSerialization}.
+ */
 public class MetricDumpSerializerTest {
 	@Test
 	public void testNullGaugeHandling() throws IOException {
@@ -51,20 +55,20 @@ public class MetricDumpSerializerTest {
 		MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
 
 		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
-		
+
 		gauges.put(new Gauge<Object>() {
 			@Override
 			public Object getValue() {
 				return null;
 			}
 		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "g"));
-		
+
 		MetricDumpSerialization.MetricSerializationResult output = serializer.serialize(
-			Collections.<Counter, Tuple2<QueryScopeInfo,String>>emptyMap(),
+			Collections.<Counter, Tuple2<QueryScopeInfo, String>>emptyMap(),
 			gauges,
 			Collections.<Histogram, Tuple2<QueryScopeInfo, String>>emptyMap(),
 			Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
-		
+
 		// no metrics should be serialized
 		Assert.assertEquals(0, output.serializedMetrics.length);
 
@@ -80,10 +84,10 @@ public class MetricDumpSerializerTest {
 		final ObjectOutputStream oos = new ObjectOutputStream(bos);
 
 		oos.writeObject(serializer.serialize(
-			new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(),
-			new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(),
-			new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(),
-			new HashMap<Meter, Tuple2<QueryScopeInfo,String>>()));
+			new HashMap<Counter, Tuple2<QueryScopeInfo, String>>(),
+			new HashMap<Gauge<?>, Tuple2<QueryScopeInfo, String>>(),
+			new HashMap<Histogram, Tuple2<QueryScopeInfo, String>>(),
+			new HashMap<Meter, Tuple2<QueryScopeInfo, String>>()));
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
index 3b65184..c7b9793 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.junit.Test;
@@ -25,6 +26,9 @@ import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_H
 import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link MetricDump} classes.
+ */
 public class MetricDumpTest {
 	@Test
 	public void testDumpedCounter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 2243495..5c33ad6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -15,13 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.testkit.TestActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -34,11 +30,20 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link MetricQueryService}.
+ */
 public class MetricQueryServiceTest extends TestLogger {
 	@Test
 	public void testCreateDump() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
index 597e376..f4f9515 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.junit.Test;
@@ -26,6 +27,9 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY
 import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link QueryScopeInfo} classes.
+ */
 public class QueryScopeInfoTest {
 	@Test
 	public void testJobManagerMetricInfo() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 04e40ae..648ee47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
@@ -28,11 +29,15 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.TestReporter;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link AbstractMetricGroup}.
+ */
 public class AbstractMetricGroupTest {
 	/**
 	 * Verifies that no {@link NullPointerException} is thrown when {@link AbstractMetricGroup#getAllVariables()} is
@@ -54,7 +59,7 @@ public class AbstractMetricGroupTest {
 			}
 		};
 		assertTrue(group.getAllVariables().isEmpty());
-		
+
 		registry.shutdown();
 	}
 
@@ -119,11 +124,15 @@ public class AbstractMetricGroupTest {
 
 	}
 
+	/**
+	 * Reporter that verifies the scope caching behavior.
+	 */
 	public static class TestReporter1 extends ScopeCheckingTestReporter {
 		@Override
 		public String filterCharacters(String input) {
 			return FILTER_B.filterCharacters(input);
 		}
+
 		@Override
 		public void checkScopes(Metric metric, String metricName, MetricGroup group) {
 			// the first call determines which filter is applied to all future calls; in this case no filter is used at all
@@ -141,6 +150,9 @@ public class AbstractMetricGroupTest {
 		}
 	}
 
+	/**
+	 * Reporter that verifies the scope caching behavior.
+	 */
 	public static class TestReporter2 extends ScopeCheckingTestReporter {
 		@Override
 		public String filterCharacters(String input) {
@@ -173,7 +185,7 @@ public class AbstractMetricGroupTest {
 		try {
 			TaskManagerMetricGroup group = new TaskManagerMetricGroup(testRegistry, "host", "id");
 			assertEquals("MetricReporters list should be empty", 0, testRegistry.getReporters().size());
-			
+
 			// default delimiter should be used
 			assertEquals("A.B.X.D.1", group.getMetricIdentifier("1", FILTER_C));
 			// no caching should occur

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 7834755..03341a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
@@ -26,12 +27,16 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link JobManagerMetricGroup}.
+ */
 public class JobManagerGroupTest extends TestLogger {
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index d8bd57a..d734dfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -26,11 +26,15 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link JobManagerJobMetricGroup}.
+ */
 public class JobManagerJobGroupTest extends TestLogger {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index ace0236..56ce5fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
@@ -33,8 +34,11 @@ import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the registration of groups and metrics on a {@link MetricGroup}.
+ */
 public class MetricGroupRegistrationTest {
 	/**
 	 * Verifies that group methods instantiate the correct metric with the given name.
@@ -59,7 +63,7 @@ public class MetricGroupRegistrationTest {
 				return null;
 			}
 		});
-		
+
 		Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
 		assertEquals("gauge", TestReporter1.lastPassedName);
 
@@ -85,8 +89,11 @@ public class MetricGroupRegistrationTest {
 		registry.shutdown();
 	}
 
+	/**
+	 * Reporter that exposes the last name and metric instance it was notified of.
+	 */
 	public static class TestReporter1 extends TestReporter {
-		
+
 		public static Metric lastPassedMetric;
 		public static String lastPassedName;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 665abb1..7397c87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -24,22 +24,29 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link MetricGroup}.
+ */
 public class MetricGroupTest extends TestLogger {
 
 	private static final MetricRegistryConfiguration defaultMetricRegistryConfiguration = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
-	
+
 	private MetricRegistry registry;
 
 	private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry();
@@ -63,7 +70,7 @@ public class MetricGroupTest extends TestLogger {
 		String groupName = "sometestname";
 		MetricGroup subgroup1 = group.addGroup(groupName);
 		MetricGroup subgroup2 = group.addGroup(groupName);
-		
+
 		assertNotNull(subgroup1);
 		assertNotNull(subgroup2);
 		assertTrue(subgroup1 == subgroup2);
@@ -82,10 +89,12 @@ public class MetricGroupTest extends TestLogger {
 		group.counter("testcounter");
 		group.gauge("testgauge", new Gauge<Object>() {
 			@Override
-			public Object getValue() { return null; }
+			public Object getValue() {
+				return null;
+			}
 		});
 	}
-	
+
 	@Test
 	public void closedGroupCreatesClosedGroups() {
 		GenericMetricGroup group = new GenericMetricGroup(exceptionOnRegister,
@@ -94,7 +103,7 @@ public class MetricGroupTest extends TestLogger {
 
 		group.close();
 		assertTrue(group.isClosed());
-		
+
 		AbstractMetricGroup subgroup = (AbstractMetricGroup) group.addGroup("test subgroup");
 		assertTrue(subgroup.isClosed());
 	}
@@ -104,7 +113,7 @@ public class MetricGroupTest extends TestLogger {
 		final String name = "abctestname";
 		GenericMetricGroup group = new GenericMetricGroup(
 				registry, new DummyAbstractMetricGroup(registry), "testgroup");
-		
+
 		assertNotNull(group.counter(name));
 		assertNotNull(group.counter(name));
 	}
@@ -114,7 +123,7 @@ public class MetricGroupTest extends TestLogger {
 		final String name = "abctestname";
 		GenericMetricGroup group = new GenericMetricGroup(
 				registry, new DummyAbstractMetricGroup(registry), "testgroup");
-		
+
 		assertNotNull(group.addGroup(name));
 		assertNotNull(group.counter(name));
 	}
@@ -143,11 +152,11 @@ public class MetricGroupTest extends TestLogger {
 		assertEquals(vid.toString(), info2.vertexID);
 		assertEquals(4, info2.subtaskIndex);
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	private static class ExceptionOnRegisterRegistry extends MetricRegistry {
-		
+
 		public ExceptionOnRegisterRegistry() {
 			super(defaultMetricRegistryConfiguration);
 		}
@@ -164,7 +173,7 @@ public class MetricGroupTest extends TestLogger {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	private static class DummyAbstractMetricGroup extends AbstractMetricGroup {
 
 		public DummyAbstractMetricGroup(MetricRegistry registry) {
@@ -182,7 +191,8 @@ public class MetricGroupTest extends TestLogger {
 		}
 
 		@Override
-		protected void addMetric(String name, Metric metric) {}
+		protected void addMetric(String name, Metric metric) {
+		}
 
 		@Override
 		public MetricGroup addGroup(String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index af73c27..c232cf2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.Map;
@@ -34,6 +35,9 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+/**
+ * Tests for the {@link OperatorMetricGroup}.
+ */
 public class OperatorGroupTest extends TestLogger {
 
 	@Test
@@ -81,7 +85,7 @@ public class OperatorGroupTest extends TestLogger {
 		JobID jid = new JobID();
 		AbstractID tid = new AbstractID();
 		AbstractID eid = new AbstractID();
-		
+
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
 		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
 		TaskMetricGroup taskGroup = new TaskMetricGroup(

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
index 1ff804a..f6a1277 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
@@ -15,20 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link QueryScopeInfo} classes.
+ */
 public class QueryScopeInfoTest {
 	@Test
 	public void testJobManagerQueryScopeInfo() {
 		QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
 		assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
 		assertEquals("", info.scope);
-		
+
 		info = info.copy("world");
 		assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
 		assertEquals("world", info.scope);
@@ -53,7 +58,7 @@ public class QueryScopeInfoTest {
 		assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
 		assertEquals("world", info.scope);
 		assertEquals("tmid", info.taskManagerID);
-		
+
 		info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "hello");
 		assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
 		assertEquals("hello", info.scope);

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index 564a518..bcf77de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -15,17 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+/**
+ * Tests for the {@link TaskIOMetricGroup}.
+ */
 public class TaskIOMetricGroupTest {
 	@Test
 	public void testTaskIOMetricGroup() {
@@ -50,7 +55,7 @@ public class TaskIOMetricGroupTest {
 		taskIO.getNumBytesInLocalCounter().inc(100L);
 		taskIO.getNumBytesInRemoteCounter().inc(150L);
 		taskIO.getNumBytesOutCounter().inc(250L);
-		
+
 		IOMetrics io = taskIO.createSnapshot();
 		assertEquals(32L, io.getNumRecordsIn());
 		assertEquals(64L, io.getNumRecordsOut());


[5/6] flink git commit: [FLINK-6781] Make statement fetch size configurable in JDBCInputFormat.

Posted by ch...@apache.org.
[FLINK-6781] Make statement fetch size configurable in JDBCInputFormat.

This closes #4036.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5605107b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5605107b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5605107b

Branch: refs/heads/master
Commit: 5605107bd59f10fc9ca7d9ac53fa8d60ecdcfdbc
Parents: c041dd8
Author: Maximilian Bode <ma...@tngtech.com>
Authored: Wed May 31 18:46:55 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/jdbc/JDBCInputFormat.java | 17 ++++++++
 .../api/java/io/jdbc/JDBCInputFormatTest.java   | 43 ++++++++++++++++++++
 2 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index 835fb23..b7ac744 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
@@ -30,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,6 +115,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 	private transient Connection dbConn;
 	private transient PreparedStatement statement;
 	private transient ResultSet resultSet;
+	private int fetchSize;
 
 	private boolean hasNext;
 	private Object[][] parameterValues;
@@ -141,6 +144,9 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 				dbConn = DriverManager.getConnection(dbURL, username, password);
 			}
 			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
+			if (fetchSize > 0) {
+				statement.setFetchSize(fetchSize);
+			}
 		} catch (SQLException se) {
 			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
 		} catch (ClassNotFoundException cnfe) {
@@ -312,6 +318,11 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 		return new DefaultInputSplitAssigner(inputSplits);
 	}
 
+	@VisibleForTesting
+	PreparedStatement getStatement() {
+		return statement;
+	}
+
 	/**
 	 * A builder used to set parameters to the output format's configuration in a fluent way.
 	 * @return builder
@@ -378,6 +389,12 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 			return this;
 		}
 
+		public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
+			Preconditions.checkArgument(fetchSize > 0, "Illegal value %s for fetchSize, has to be positive.", fetchSize);
+			format.fetchSize = fetchSize;
+			return this;
+		}
+
 		public JDBCInputFormat finish() {
 			if (format.username == null) {
 				LOG.info("Username was not supplied separately.");

http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index b1416ea..f7a86e5 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -30,7 +30,9 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 
 /**
  * Tests for the {@link JDBCInputFormat}.
@@ -100,6 +102,47 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				.finish();
 	}
 
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidFetchSize() {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(SELECT_ALL_BOOKS)
+			.setRowTypeInfo(ROW_TYPE_INFO)
+			.setFetchSize(-7)
+			.finish();
+	}
+
+	@Test
+	public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(SELECT_ALL_BOOKS)
+			.setRowTypeInfo(ROW_TYPE_INFO)
+			.finish();
+		jdbcInputFormat.openInputFormat();
+
+		Class.forName(DRIVER_CLASS);
+		final int defaultFetchSize = DriverManager.getConnection(DB_URL).createStatement().getFetchSize();
+
+		Assert.assertEquals(defaultFetchSize, jdbcInputFormat.getStatement().getFetchSize());
+	}
+
+	@Test
+	public void testFetchSizeCanBeConfigured() throws SQLException {
+		final int desiredFetchSize = 10_000;
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(SELECT_ALL_BOOKS)
+			.setRowTypeInfo(ROW_TYPE_INFO)
+			.setFetchSize(desiredFetchSize)
+			.finish();
+		jdbcInputFormat.openInputFormat();
+		Assert.assertEquals(desiredFetchSize, jdbcInputFormat.getStatement().getFetchSize());
+	}
+
 	@Test
 	public void testJDBCInputFormatWithoutParallelism() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()