You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/02 13:14:19 UTC
[1/6] flink git commit: [hotfix] Update DataSet docs concerning
JDBCInputFormat.
Repository: flink
Updated Branches:
refs/heads/master cd5b4a630 -> 94ade3de9
[hotfix] Update DataSet docs concerning JDBCInputFormat.
The way to provide type information was outdated.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94ade3de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94ade3de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94ade3de
Branch: refs/heads/master
Commit: 94ade3de9f7345820e869940ced40bce3e9fae38
Parents: 5605107
Author: Maximilian Bode <ma...@tngtech.com>
Authored: Wed May 31 18:48:51 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200
----------------------------------------------------------------------
docs/dev/batch/index.md | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/94ade3de/docs/dev/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index 6fccc04..9a8ce22 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -890,14 +890,12 @@ DataSet<Long> numbers = env.generateSequence(1, 10000000);
// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
env.createInput(
- // create and configure input format
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
- .finish(),
- // specify type information for DataSet
- new TupleTypeInfo(Tuple2.class, STRING_TYPE_INFO, INT_TYPE_INFO)
+ .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+ .finish()
);
// Note: Flink's program compiler needs to infer the data types of the data items which are returned
[3/6] flink git commit: [FLINK-6793] Activate checkstyle for
runtime/metrics
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 11f3c0f..bd85303 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -39,6 +39,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the {@link TaskManagerMetricGroup}.
+ */
public class TaskManagerGroupTest extends TestLogger {
// ------------------------------------------------------------------------
@@ -74,24 +77,24 @@ public class TaskManagerGroupTest extends TestLogger {
jid1, jobName1, vertex12, execution12, "test", 13, 1);
TaskMetricGroup tmGroup21 = group.addTaskForJob(
jid2, jobName2, vertex21, execution21, "test", 7, 2);
-
+
assertEquals(2, group.numRegisteredJobMetricGroups());
assertFalse(tmGroup11.parent().isClosed());
assertFalse(tmGroup12.parent().isClosed());
assertFalse(tmGroup21.parent().isClosed());
-
+
// close all for job 2 and one from job 1
tmGroup11.close();
tmGroup21.close();
assertTrue(tmGroup11.isClosed());
assertTrue(tmGroup21.isClosed());
-
+
// job 2 should be removed, job should still be there
assertFalse(tmGroup11.parent().isClosed());
assertFalse(tmGroup12.parent().isClosed());
assertTrue(tmGroup21.parent().isClosed());
assertEquals(1, group.numRegisteredJobMetricGroups());
-
+
// add one more to job one
TaskMetricGroup tmGroup13 = group.addTaskForJob(
jid1, jobName1, vertex13, execution13, "test", 0, 0);
@@ -101,7 +104,7 @@ public class TaskManagerGroupTest extends TestLogger {
assertTrue(tmGroup11.parent().isClosed());
assertTrue(tmGroup12.parent().isClosed());
assertTrue(tmGroup13.parent().isClosed());
-
+
assertEquals(0, group.numRegisteredJobMetricGroups());
registry.shutdown();
@@ -111,8 +114,7 @@ public class TaskManagerGroupTest extends TestLogger {
public void testCloseClosesAll() throws IOException {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
- registry, "localhost", new AbstractID().toString());
-
+ registry, "localhost", new AbstractID().toString());
final JobID jid1 = new JobID();
final JobID jid2 = new JobID();
@@ -136,14 +138,14 @@ public class TaskManagerGroupTest extends TestLogger {
jid2, jobName2, vertex21, execution21, "test", 7, 1);
group.close();
-
+
assertTrue(tmGroup11.isClosed());
assertTrue(tmGroup12.isClosed());
assertTrue(tmGroup21.isClosed());
registry.shutdown();
}
-
+
// ------------------------------------------------------------------------
// scope name tests
// ------------------------------------------------------------------------
@@ -153,7 +155,7 @@ public class TaskManagerGroupTest extends TestLogger {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id");
- assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents());
+ assertArrayEquals(new String[]{"localhost", "taskmanager", "id"}, group.getScopeComponents());
assertEquals("localhost.taskmanager.id.name", group.getMetricIdentifier("name"));
registry.shutdown();
}
@@ -165,7 +167,7 @@ public class TaskManagerGroupTest extends TestLogger {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
- assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents());
+ assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name"));
registry.shutdown();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index f9891cb..fc0ce5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -22,16 +22,19 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.metrics.MetricRegistry;
-
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the {@link TaskManagerJobMetricGroup}.
+ */
public class TaskManagerJobGroupTest extends TestLogger {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 183237a..22b0a1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -28,12 +28,16 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the {@link TaskMetricGroup}.
+ */
public class TaskMetricGroupTest extends TestLogger {
// ------------------------------------------------------------------------
@@ -51,7 +55,7 @@ public class TaskMetricGroupTest extends TestLogger {
TaskMetricGroup taskGroup = new TaskMetricGroup(registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2);
assertArrayEquals(
- new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "aTaskName", "13"},
+ new String[]{"theHostName", "taskmanager", "test-tm-id", "myJobName", "aTaskName", "13"},
taskGroup.getScopeComponents());
assertEquals(
@@ -78,7 +82,7 @@ public class TaskMetricGroupTest extends TestLogger {
registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2);
assertArrayEquals(
- new String[] { "test-tm-id", jid.toString(), vertexId.toString(), executionId.toString() },
+ new String[]{"test-tm-id", jid.toString(), vertexId.toString(), executionId.toString()},
taskGroup.getScopeComponents());
assertEquals(
@@ -102,7 +106,7 @@ public class TaskMetricGroupTest extends TestLogger {
registry, jmGroup, new AbstractID(), executionId, "aTaskName", 13, 1);
assertArrayEquals(
- new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13" },
+ new String[]{"theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13"},
taskGroup.getScopeComponents());
assertEquals(
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
index 601f734..15bf718 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
@@ -15,10 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.util;
import org.apache.flink.metrics.CharacterFilter;
+/**
+ * A {@link CharacterFilter} that returns the given string without any modification.
+ */
public class DummyCharacterFilter implements CharacterFilter {
@Override
public String filterCharacters(String input) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
index cd96c60..e5e09d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestReporter.java
@@ -23,6 +23,9 @@ import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.AbstractReporter;
+/**
+ * No-op reporter implementation.
+ */
public class TestReporter extends AbstractReporter {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
index 82f8504..8565ed3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
@@ -15,11 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.util;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
+/**
+ * Stateless test histogram for which all methods return a static value.
+ */
public class TestingHistogram implements Histogram {
@Override
[2/6] flink git commit: [FLINK-6795] Activate checkstyle for
runtime/process
Posted by ch...@apache.org.
[FLINK-6795] Activate checkstyle for runtime/process
This closes #4040.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c041dd81
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c041dd81
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c041dd81
Branch: refs/heads/master
Commit: c041dd81f66b5f53ae8e1911016949eb4c3ff530
Parents: 00a2666
Author: zentol <ch...@apache.org>
Authored: Thu Jun 1 12:25:42 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../main/java/org/apache/flink/runtime/process/ProcessReaper.java | 2 +-
2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c041dd81/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 0f8e2e9..38ab32b 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -450,7 +450,6 @@ under the License.
**/runtime/net/**,
**/runtime/operators/**,
**/runtime/plugable/**,
- **/runtime/process/**,
**/runtime/query/**,
**/runtime/registration/**,
**/runtime/resourcemanager/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/c041dd81/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index 09e1839..77ec3eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -50,7 +50,7 @@ public class ProcessReaper extends UntypedActor {
String name = term.actor().path().toSerializationFormat();
if (log != null) {
log.error("Actor " + name + " terminated, stopping process...");
-
+
// give the log some time to reach disk
try {
Thread.sleep(100);
[6/6] flink git commit: [FLINK-6794] Activate checkstyle for
migration/**
Posted by ch...@apache.org.
[FLINK-6794] Activate checkstyle for migration/**
This closes #4038.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00a26668
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00a26668
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00a26668
Branch: refs/heads/master
Commit: 00a266680c8e962fb158472821e1d78660385a2e
Parents: 2b3d284
Author: zentol <ch...@apache.org>
Authored: Thu Jun 1 12:33:09 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 5 -----
.../src/main/java/org/apache/flink/migration/MigrationUtil.java | 3 +++
.../migration/streaming/runtime/tasks/StreamTaskState.java | 5 ++---
.../migration/streaming/runtime/tasks/StreamTaskStateList.java | 2 +-
4 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/00a26668/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index d83d671..0f8e2e9 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -420,11 +420,6 @@ under the License.
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
<excludes>
- **/migration/api/**,
- **/migration/runtime/**,
- **/migration/state/**,
- **/migration/streaming/**,
- **/migration/*,
**/runtime/akka/**,
**/runtime/blob/**,
**/runtime/broadcast/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/00a26668/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
index a4e3a2e..a6055a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java
@@ -23,6 +23,9 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
import java.util.Collection;
+/**
+ * Utility functions for migration.
+ */
public class MigrationUtil {
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/flink/blob/00a26668/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
index 7a2aab9..b044ffb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
@@ -36,7 +36,7 @@ import java.util.HashMap;
public class StreamTaskState implements Serializable, Closeable {
private static final long serialVersionUID = 1L;
-
+
private StateHandle<?> operatorState;
private StateHandle<Serializable> functionState;
@@ -74,14 +74,13 @@ public class StreamTaskState implements Serializable, Closeable {
/**
* Checks if this state object actually contains any state, or if all of the state
* fields are null.
- *
+ *
* @return True, if all state is null, false if at least one state is not null.
*/
public boolean isEmpty() {
return operatorState == null & functionState == null & kvStates == null;
}
-
@Override
public void close() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/00a26668/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
index c2357f7..7643039 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
@@ -35,7 +35,7 @@ public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
private static final long serialVersionUID = 1L;
- /** The states for all operator */
+ /** The states for all operator. */
private final StreamTaskState[] states;
public StreamTaskStateList(StreamTaskState[] states) throws Exception {
[4/6] flink git commit: [FLINK-6793] Activate checkstyle for
runtime/metrics
Posted by ch...@apache.org.
[FLINK-6793] Activate checkstyle for runtime/metrics
This closes #4037.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b3d284b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b3d284b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b3d284b
Branch: refs/heads/master
Commit: 2b3d284bf09a30edafa4dadf50492156bb47027b
Parents: cd5b4a6
Author: zentol <ch...@apache.org>
Authored: Wed May 31 16:39:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../flink/runtime/metrics/MetricNames.java | 4 ++
.../flink/runtime/metrics/MetricRegistry.java | 22 ++++---
.../metrics/MetricRegistryConfiguration.java | 11 ++--
.../flink/runtime/metrics/ViewUpdater.java | 1 +
.../flink/runtime/metrics/dump/MetricDump.java | 4 +-
.../metrics/dump/MetricDumpSerialization.java | 31 ++++++----
.../metrics/dump/MetricQueryService.java | 16 ++---
.../runtime/metrics/dump/QueryScopeInfo.java | 5 +-
.../metrics/groups/AbstractMetricGroup.java | 52 ++++++++--------
.../metrics/groups/ComponentMetricGroup.java | 8 +--
.../metrics/groups/GenericMetricGroup.java | 2 +-
.../groups/JobManagerJobMetricGroup.java | 2 +
.../metrics/groups/JobManagerMetricGroup.java | 1 +
.../runtime/metrics/groups/JobMetricGroup.java | 9 +--
.../metrics/groups/OperatorIOMetricGroup.java | 3 +-
.../metrics/groups/OperatorMetricGroup.java | 4 +-
.../metrics/groups/TaskIOMetricGroup.java | 4 +-
.../groups/TaskManagerJobMetricGroup.java | 4 +-
.../metrics/groups/TaskManagerMetricGroup.java | 1 -
.../runtime/metrics/groups/TaskMetricGroup.java | 9 +--
.../runtime/metrics/scope/ScopeFormat.java | 18 +++---
.../runtime/metrics/scope/ScopeFormats.java | 10 ++-
.../flink/runtime/metrics/util/MetricUtils.java | 7 ++-
.../runtime/metrics/MetricRegistryTest.java | 65 +++++++++++++++-----
.../runtime/metrics/TaskManagerMetricsTest.java | 23 ++++---
.../metrics/dump/MetricDumpSerializerTest.java | 22 ++++---
.../runtime/metrics/dump/MetricDumpTest.java | 4 ++
.../metrics/dump/MetricQueryServiceTest.java | 15 +++--
.../metrics/dump/QueryScopeInfoTest.java | 4 ++
.../metrics/groups/AbstractMetricGroupTest.java | 16 ++++-
.../metrics/groups/JobManagerGroupTest.java | 5 ++
.../metrics/groups/JobManagerJobGroupTest.java | 4 ++
.../groups/MetricGroupRegistrationTest.java | 13 +++-
.../runtime/metrics/groups/MetricGroupTest.java | 38 +++++++-----
.../metrics/groups/OperatorGroupTest.java | 6 +-
.../metrics/groups/QueryScopeInfoTest.java | 9 ++-
.../metrics/groups/TaskIOMetricGroupTest.java | 7 ++-
.../metrics/groups/TaskManagerGroupTest.java | 24 ++++----
.../metrics/groups/TaskManagerJobGroupTest.java | 5 +-
.../metrics/groups/TaskMetricGroupTest.java | 10 ++-
.../metrics/util/DummyCharacterFilter.java | 4 ++
.../runtime/metrics/util/TestReporter.java | 3 +
.../runtime/metrics/util/TestingHistogram.java | 4 ++
44 files changed, 331 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 2272bc7..d83d671 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -451,7 +451,6 @@ under the License.
**/runtime/leaderretrieval/**,
**/runtime/memory/**,
**/runtime/messages/**,
- **/runtime/metrics/**,
**/runtime/minicluster/**,
**/runtime/net/**,
**/runtime/operators/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 9202ca1..300b4b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -15,8 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics;
+/**
+ * Collection of metric names.
+ */
public class MetricNames {
private MetricNames() {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 9f46d47..5018cbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -18,10 +18,6 @@
package org.apache.flink.runtime.metrics;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.pattern.Patterns;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
@@ -38,11 +34,13 @@ import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.List;
@@ -53,6 +51,10 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
/**
* A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
* connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
@@ -154,7 +156,7 @@ public class MetricRegistry {
/**
* Initializes the MetricQueryService.
- *
+ *
* @param actorSystem ActorSystem to create the MetricQueryService on
* @param resourceID resource ID used to disambiguate the actor name
*/
@@ -250,7 +252,7 @@ public class MetricRegistry {
}
}
}
-
+
private void shutdownExecutor() {
if (executor != null) {
executor.shutdown();
@@ -361,7 +363,7 @@ public class MetricRegistry {
* This task is explicitly a static class, so that it does not hold any references to the enclosing
* MetricsRegistry instance.
*
- * This is a subtle difference, but very important: With this static class, the enclosing class instance
+ * <p>This is a subtle difference, but very important: With this static class, the enclosing class instance
* may become garbage-collectible, whereas with an anonymous inner class, the timer thread
* (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.
* Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible,
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 3475f04..6f6db86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class MetricRegistryConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(MetricRegistryConfiguration.class);
- private static volatile MetricRegistryConfiguration DEFAULT_CONFIGURATION;
+ private static volatile MetricRegistryConfiguration defaultConfiguration;
// regex pattern to split the defined reporters
private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*");
@@ -148,15 +149,15 @@ public class MetricRegistryConfiguration {
public static MetricRegistryConfiguration defaultMetricRegistryConfiguration() {
// create the default metric registry configuration only once
- if (DEFAULT_CONFIGURATION == null) {
+ if (defaultConfiguration == null) {
synchronized (MetricRegistryConfiguration.class) {
- if (DEFAULT_CONFIGURATION == null) {
- DEFAULT_CONFIGURATION = fromConfiguration(new Configuration());
+ if (defaultConfiguration == null) {
+ defaultConfiguration = fromConfiguration(new Configuration());
}
}
}
- return DEFAULT_CONFIGURATION;
+ return defaultConfiguration;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
index e4d0596..77bd0d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics;
import org.apache.flink.metrics.View;
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
index 2239b50..202e453 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.dump;
import org.apache.flink.util.Preconditions;
@@ -121,7 +122,7 @@ public abstract class MetricDump {
}
/**
- * Container for the rate of a {@link org.apache.flink.metrics.Meter}.
+ * Container for the rate of a {@link org.apache.flink.metrics.Meter}.
*/
public static class MeterDump extends MetricDump {
public final double rate;
@@ -130,6 +131,7 @@ public abstract class MetricDump {
super(scopeInfo, name);
this.rate = rate;
}
+
@Override
public byte getCategory() {
return METRIC_CATEGORY_METER;
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index e57a0d8..e173522 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.dump;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -27,6 +28,7 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +58,14 @@ public class MetricDumpSerialization {
/**
* This class encapsulates all serialized metrics and a count for each metric type.
- *
- * The counts are stored separately from the metrics since the final count for any given type can only be
+ *
+ * <p>The counts are stored separately from the metrics since the final count for any given type can only be
* determined after all metrics of that type were serialized. Storing them together in a single byte[] would
* require an additional copy of all serialized metrics, as you would first have to serialize the metrics into a
* temporary buffer to calculate the counts, write the counts to the final output and copy all metrics from the
* temporary buffer.
- *
- * Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag)
+ *
+ * <p>Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag)
* is written for each metric, this would require more bandwidth due to the sheer number of metrics.
*/
public static class MetricSerializationResult implements Serializable {
@@ -75,12 +77,12 @@ public class MetricDumpSerialization {
public final int numGauges;
public final int numMeters;
public final int numHistograms;
-
+
public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) {
Preconditions.checkNotNull(serializedMetrics);
Preconditions.checkArgument(numCounters >= 0);
Preconditions.checkArgument(numGauges >= 0);
- Preconditions.checkArgument(numMeters >= 0);
+ Preconditions.checkArgument(numMeters >= 0);
Preconditions.checkArgument(numHistograms >= 0);
this.serializedMetrics = serializedMetrics;
this.numCounters = numCounters;
@@ -94,18 +96,21 @@ public class MetricDumpSerialization {
// Serialization
//-------------------------------------------------------------------------
+ /**
+ * Serializes a set of metrics into a {@link MetricSerializationResult}.
+ */
public static class MetricDumpSerializer {
private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
/**
* Serializes the given metrics and returns the resulting byte array.
- *
- * Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned
+ *
+ * <p>Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned
* {@link MetricSerializationResult}.
- *
- * If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult}
- * is partially corrupted. Such a result can be deserialized safely by
+ *
+ * <p>If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult}
+ * is partially corrupted. Such a result can be deserialized safely by
* {@link MetricDumpDeserializer#deserialize(MetricSerializationResult)}; however only metrics that were
* fully serialized before the failure will be returned.
*
@@ -263,6 +268,9 @@ public class MetricDumpSerialization {
// Deserialization
//-------------------------------------------------------------------------
+ /**
+ * Deserializer for reading a list of {@link MetricDump MetricDumps} from a {@link MetricSerializationResult}.
+ */
public static class MetricDumpDeserializer {
/**
* De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}.
@@ -311,7 +319,6 @@ public class MetricDumpSerialization {
}
}
-
private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException {
QueryScopeInfo scope = deserializeMetricInfo(dis);
String name = dis.readUTF();
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 2229139..8821e0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -15,13 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.dump;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
@@ -31,6 +27,12 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +46,7 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
/**
* The MetricQueryService creates a key-value representation of all metrics currently registered with Flink when queried.
*
- * It is realized as an actor and can be notified of
+ * <p>It is realized as an actor and can be notified of
* - an added metric by calling {@link MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, AbstractMetricGroup)}
* - a removed metric by calling {@link MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)}
* - a metric dump request by sending the return value of {@link MetricQueryService#getCreateDump()}
@@ -217,6 +219,6 @@ public class MetricQueryService extends UntypedActor {
}
private static class CreateDump implements Serializable {
- private static CreateDump INSTANCE = new CreateDump();
+ private static final CreateDump INSTANCE = new CreateDump();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index 6572ca0..9af9d78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.dump;
/**
@@ -28,7 +29,7 @@ public abstract class QueryScopeInfo {
public static final byte INFO_CATEGORY_TASK = 3;
public static final byte INFO_CATEGORY_OPERATOR = 4;
- /** The remaining scope not covered by specific fields */
+ /** The remaining scope not covered by specific fields. */
public final String scope;
private QueryScopeInfo(String scope) {
@@ -45,7 +46,7 @@ public abstract class QueryScopeInfo {
/**
* Returns the category for this QueryScopeInfo.
- *
+ *
* @return category
*/
public abstract byte getCategory();
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index a19970d..c67c5ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +42,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Abstract {@link MetricGroup} that contains key functionality for adding metrics and groups.
- *
+ *
* <p><b>IMPORTANT IMPLEMENTATION NOTE</b>
- *
+ *
* <p>This class uses locks for adding and removing metrics objects. This is done to
* prevent resource leaks in the presence of concurrently closing a group and adding
* metrics and subgroups.
@@ -56,30 +57,29 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* from any metrics reporter and any internal maps. Note that even closed metrics groups
* return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
* These metrics simply do not get reported any more, when created on a closed group.
- *
+ *
* @param <A> The type of the parent MetricGroup
*/
@Internal
public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> implements MetricGroup {
- /** shared logger */
private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
// ------------------------------------------------------------------------
- /** The parent group containing this group */
+ /** The parent group containing this group. */
protected final A parent;
/** The map containing all variables and their associated values, lazily computed. */
protected volatile Map<String, String> variables;
-
- /** The registry that this metrics group belongs to */
+
+ /** The registry that this metrics group belongs to. */
protected final MetricRegistry registry;
- /** All metrics that are directly contained in this group */
+ /** All metrics that are directly contained in this group. */
private final Map<String, Metric> metrics = new HashMap<>();
- /** All metric subgroups of this group */
+ /** All metric subgroups of this group. */
private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
/** The metrics scope represented by this group.
@@ -97,7 +97,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/** The metrics query service scope represented by this group, lazily computed. */
protected QueryScopeInfo queryServiceScopeInfo;
- /** Flag indicating whether this group has been closed */
+ /** Flag indicating whether this group has been closed. */
private volatile boolean closed;
// ------------------------------------------------------------------------
@@ -111,7 +111,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
public Map<String, String> getAllVariables() {
if (variables == null) { // avoid synchronization for common case
- synchronized(this) {
+ synchronized (this) {
if (variables == null) {
if (parent != null) {
variables = parent.getAllVariables();
@@ -126,7 +126,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Returns the logical scope of this group, for example
- * {@code "taskmanager.job.task"}
+ * {@code "taskmanager.job.task"}.
*
* @param filter character filter which is applied to the scope components
* @return logical scope
@@ -137,7 +137,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Returns the logical scope of this group, for example
- * {@code "taskmanager.job.task"}
+ * {@code "taskmanager.job.task"}.
*
* @param filter character filter which is applied to the scope components
* @return logical scope
@@ -155,7 +155,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Returns the name for this group, meaning what kind of entity it represents, for example "taskmanager".
- *
+ *
* @param filter character filter which is applied to the name
* @return logical name for this group
*/
@@ -163,9 +163,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Gets the scope as an array of the scope components, for example
- * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
- *
- * @see #getMetricIdentifier(String)
+ * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}.
+ *
+ * @see #getMetricIdentifier(String)
*/
public String[] getScopeComponents() {
return scopeComponents;
@@ -173,7 +173,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Returns the metric query service scope for this group.
- *
+ *
* @param filter character filter
* @return query service scope
*/
@@ -194,8 +194,8 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Returns the fully qualified metric name, for example
- * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
- *
+ * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
+ *
* @param metricName metric name
* @return fully qualified metric name
*/
@@ -205,7 +205,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Returns the fully qualified metric name, for example
- * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+ * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
*
* @param metricName metric name
* @param filter character filter which is applied to the scope components if not null.
@@ -217,7 +217,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Returns the fully qualified metric name using the configured delimiter for the reporter with the given index, for example
- * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+ * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
*
* @param metricName metric name
* @param filter character filter which is applied to the scope components if not null.
@@ -250,7 +250,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
return scopeStrings[reporterIndex] + delimiter + metricName;
}
}
-
+
// ------------------------------------------------------------------------
// Closing
// ------------------------------------------------------------------------
@@ -292,7 +292,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
public Counter counter(String name) {
return counter(name, new SimpleCounter());
}
-
+
@Override
public <C extends Counter> C counter(int name, C counter) {
return counter(String.valueOf(name), counter);
@@ -340,7 +340,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
/**
* Adds the given metric to the group and registers it at the registry, if the group
* is not yet closed, and if no metric with the same name has been registered before.
- *
+ *
* @param name the name to register the metric under
* @param metric the metric to register
*/
@@ -372,7 +372,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
else {
// we had a collision. put back the original value
metrics.put(name, prior);
-
+
// we warn here, rather than failing, because metrics are tools that should not fail the
// program when used incorrectly
LOG.warn("Name collision: Group already contains a Metric with the name '" +
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
index 9f0f483..0cd9942 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
@@ -25,9 +25,9 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g.,
+ * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g.,
* TaskManager, Job, Task, Operator).
- *
+ *
* <p>Usually, the scope of metrics is simply the hierarchy of the containing groups. For example
* the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code "A"} would have a
* fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the Metric's scope.
@@ -36,7 +36,7 @@ import java.util.Map;
* certain identifiers from the scope. The scope for metrics belonging to the "Task"
* group could for example include the task attempt number (more fine grained identification), or
* exclude it (for continuity of the namespace across failure and recovery).
- *
+ *
* @param <P> The type of the parent MetricGroup.
*/
@Internal
@@ -101,7 +101,7 @@ public abstract class ComponentMetricGroup<P extends AbstractMetricGroup<?>> ext
/**
* Gets all component metric groups that are contained in this component metric group.
- *
+ *
* @return All component metric groups that are contained in this component metric group.
*/
protected abstract Iterable<? extends ComponentMetricGroup> subComponents();
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
index 5978f2d..ee7d1ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
*/
@Internal
public class GenericMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<?>> {
- /** The name of this group */
+ /** The name of this group. */
private String name;
public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
index c4902e8..b62c7b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
@@ -22,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import javax.annotation.Nullable;
+
import java.util.Collections;
import static org.apache.flink.util.Preconditions.checkNotNull;
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index 5a35110..e09051d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
index 17f6189..876e794 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
@@ -26,21 +26,22 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import javax.annotation.Nullable;
+
import java.util.Map;
/**
* Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
* a specific job.
- *
+ *
* @param <C> The type of the parent ComponentMetricGroup.
*/
@Internal
public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends ComponentMetricGroup<C> {
- /** The ID of the job represented by this metrics group */
+ /** The ID of the job represented by this metrics group. */
protected final JobID jobId;
- /** The name of the job represented by this metrics group */
+ /** The name of the job represented by this metrics group. */
@Nullable
protected final String jobName;
@@ -53,7 +54,7 @@ public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends
@Nullable String jobName,
String[] scope) {
super(registry, scope, parent);
-
+
this.jobId = jobId;
this.jobName = jobName;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 5bf7d1f..6bf3c6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
@@ -64,7 +65,7 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
public void reuseInputMetricsForTask() {
TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup();
taskIO.reuseRecordsInputCounter(this.numRecordsIn);
-
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index 37c9dd8..2313873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -46,7 +46,7 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
}
// ------------------------------------------------------------------------
-
+
public final TaskMetricGroup parent() {
return parent;
}
@@ -68,7 +68,7 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
public OperatorIOMetricGroup getIOMetricGroup() {
return ioMetrics;
}
-
+
// ------------------------------------------------------------------------
// Component Metric Group Specifics
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 38accad..e12ecd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -24,11 +24,11 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
import java.util.ArrayList;
import java.util.List;
@@ -110,7 +110,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
// ============================================================================================
/**
- * Initialize Buffer Metrics for a task
+ * Initialize Buffer Metrics for a task.
*/
public void initializeBufferMetrics(Task task) {
final MetricGroup buffers = addGroup("buffers");
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
index 79a87d0..3921553 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.annotation.Internal;
@@ -25,6 +26,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.util.AbstractID;
import javax.annotation.Nullable;
+
import java.util.HashMap;
import java.util.Map;
@@ -39,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricGroup> {
- /** Map from execution attempt ID (task identifier) to task metrics */
+ /** Map from execution attempt ID (task identifier) to task metrics. */
private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index 92c509a..d85e868 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -46,7 +46,6 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr
private final String taskManagerId;
-
public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) {
super(registry, registry.getScopeFormats().getTaskManagerFormat().formatScope(hostname, taskManagerId), null);
this.hostname = hostname;
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 43e8e1b..cb7aaa0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.util.AbstractID;
import javax.annotation.Nullable;
+
import java.util.HashMap;
import java.util.Map;
@@ -33,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink runtime Task.
- *
+ *
* <p>Contains extra logic for adding operators.
*/
@Internal
@@ -42,13 +43,13 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
private final TaskIOMetricGroup ioMetrics;
-
- /** The execution Id uniquely identifying the executed task represented by this metrics group */
+
+ /** The execution Id uniquely identifying the executed task represented by this metrics group. */
private final AbstractID executionId;
@Nullable
protected final AbstractID vertexId;
-
+
@Nullable
private final String taskName;
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index f9efb88..18b3a0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -52,7 +52,7 @@ public abstract class ScopeFormat {
/**
* If the scope format starts with this character, then the parent components scope
* format will be used as a prefix.
- *
+ *
* <p>For example, if the TaskManager's job format is {@code "*.<job_name>"}, and the
* TaskManager format is {@code "<host>"}, then the job's metrics
* will have {@code "<host>.<job_name>"} as their scope.
@@ -90,16 +90,16 @@ public abstract class ScopeFormat {
// ----- Operator ----
public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name");
-
+
// ------------------------------------------------------------------------
// Scope Format Base
// ------------------------------------------------------------------------
- /** The scope format */
+ /** The scope format. */
private final String format;
- /** The format, split into components */
+ /** The format, split into components. */
private final String[] template;
private final int[] templatePos;
@@ -125,7 +125,7 @@ public abstract class ScopeFormat {
String[] parentTemplate = parent.template;
int parentLen = parentTemplate.length;
-
+
this.template = new String[parentLen + rawComponents.length - 1];
System.arraycopy(parentTemplate, 0, this.template, 0, parentLen);
System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1);
@@ -137,14 +137,14 @@ public abstract class ScopeFormat {
// --- compute the replacement matrix ---
// a bit of clumsy Java collections code ;-)
-
+
HashMap<String, Integer> varToValuePos = arrayToMap(variables);
List<Integer> templatePos = new ArrayList<>();
List<Integer> valuePos = new ArrayList<>();
for (int i = 0; i < template.length; i++) {
final String component = template[i];
-
+
// check if that is a variable
if (component != null && component.length() >= 3 &&
component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') {
@@ -188,7 +188,7 @@ public abstract class ScopeFormat {
public String toString() {
return "ScopeFormat '" + format + '\'';
}
-
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@@ -233,7 +233,7 @@ public abstract class ScopeFormat {
}
return sb.toString();
}
-
+
protected static String valueOrNull(Object value) {
return (value == null || (value instanceof String && ((String) value).isEmpty())) ?
"null" : value.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
index bde93be..dc49d32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
@@ -67,8 +67,7 @@ public class ScopeFormats {
String taskManagerFormat,
String taskManagerJobFormat,
String taskFormat,
- String operatorFormat)
- {
+ String operatorFormat) {
this.jobManagerFormat = new JobManagerScopeFormat(jobManagerFormat);
this.jobManagerJobFormat = new JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat);
this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat);
@@ -86,8 +85,7 @@ public class ScopeFormats {
TaskManagerScopeFormat taskManagerFormat,
TaskManagerJobScopeFormat taskManagerJobFormat,
TaskScopeFormat taskFormat,
- OperatorScopeFormat operatorFormat)
- {
+ OperatorScopeFormat operatorFormat) {
this.jobManagerFormat = checkNotNull(jobManagerFormat);
this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat);
this.taskManagerFormat = checkNotNull(taskManagerFormat);
@@ -129,8 +127,8 @@ public class ScopeFormats {
// ------------------------------------------------------------------------
/**
- * Creates the scope formats as defined in the given configuration
- *
+ * Creates the scope formats as defined in the given configuration.
+ *
* @param config The configuration that defines the formats
* @return The ScopeFormats parsed from the configuration
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 4612eaf..2ecde42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -15,12 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.util;
-import org.apache.commons.lang3.text.WordUtils;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+
+import org.apache.commons.lang3.text.WordUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +37,9 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
+/**
+ * Utility class to register pre-defined metric sets.
+ */
public class MetricUtils {
private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class);
private static final String METRIC_GROUP_STATUS_NAME = "Status";
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index b9502b2..1de2551 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.metrics;
-import akka.actor.ActorNotFound;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
@@ -34,20 +31,27 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.metrics.util.TestReporter;
-
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import org.junit.Assert;
import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.duration.FiniteDuration;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for the {@link MetricRegistry}.
+ */
public class MetricRegistryTest extends TestLogger {
private static final char GLOBAL_DEFAULT_DELIMITER = '.';
@@ -55,14 +59,14 @@ public class MetricRegistryTest extends TestLogger {
@Test
public void testIsShutdown() {
MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
+
Assert.assertFalse(metricRegistry.isShutdown());
-
+
metricRegistry.shutdown();
-
+
Assert.assertTrue(metricRegistry.isShutdown());
}
-
+
/**
* Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
*/
@@ -82,6 +86,9 @@ public class MetricRegistryTest extends TestLogger {
metricRegistry.shutdown();
}
+ /**
+ * Reporter that exposes whether open() was called.
+ */
protected static class TestReporter1 extends TestReporter {
public static boolean wasOpened = false;
@@ -114,6 +121,9 @@ public class MetricRegistryTest extends TestLogger {
metricRegistry.shutdown();
}
+ /**
+ * Reporter that exposes whether open() was called.
+ */
protected static class TestReporter11 extends TestReporter {
public static boolean wasOpened = false;
@@ -123,6 +133,9 @@ public class MetricRegistryTest extends TestLogger {
}
}
+ /**
+ * Reporter that exposes whether open() was called.
+ */
protected static class TestReporter12 extends TestReporter {
public static boolean wasOpened = false;
@@ -132,6 +145,9 @@ public class MetricRegistryTest extends TestLogger {
}
}
+ /**
+ * Reporter that exposes whether open() was called.
+ */
protected static class TestReporter13 extends TestReporter {
public static boolean wasOpened = false;
@@ -156,6 +172,9 @@ public class MetricRegistryTest extends TestLogger {
new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
}
+ /**
+ * Reporter that verifies whether configured arguments were properly passed.
+ */
protected static class TestReporter2 extends TestReporter {
@Override
public void open(MetricConfig config) {
@@ -186,9 +205,9 @@ public class MetricRegistryTest extends TestLogger {
int reportCount = TestReporter3.reportCount;
long curT = System.currentTimeMillis();
/**
- * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports.
- * This value however does not not take the first triggered report into account (=> +1).
- * Furthermore we have to account for the mis-alignment between reports being triggered and our time
+ * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports.
+ * This value however does not not take the first triggered report into account (=> +1).
+ * Furthermore we have to account for the mis-alignment between reports being triggered and our time
* measurement (=> +1); for T=200 a total of 4-6 reports may have been
* triggered depending on whether the end of the interval for the first reports ends before
* or after T=50.
@@ -201,6 +220,9 @@ public class MetricRegistryTest extends TestLogger {
registry.shutdown();
}
+ /**
+ * Reporter that exposes how often report() was called.
+ */
protected static class TestReporter3 extends TestReporter implements Scheduled {
public static int reportCount = 0;
@@ -234,6 +256,9 @@ public class MetricRegistryTest extends TestLogger {
registry.shutdown();
}
+ /**
+ * Reporter that exposes whether it was notified of added or removed metrics.
+ */
protected static class TestReporter6 extends TestReporter {
public static boolean addCalled = false;
public static boolean removeCalled = false;
@@ -253,6 +278,9 @@ public class MetricRegistryTest extends TestLogger {
}
}
+ /**
+ * Reporter that exposes whether it was notified of added or removed metrics.
+ */
protected static class TestReporter7 extends TestReporter {
public static boolean addCalled = false;
public static boolean removeCalled = false;
@@ -344,10 +372,10 @@ public class MetricRegistryTest extends TestLogger {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
List<MetricReporter> reporters = registry.getReporters();
- ((TestReporter8)reporters.get(0)).expectedDelimiter = '_'; //test1 reporter
- ((TestReporter8)reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
- ((TestReporter8)reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
- ((TestReporter8)reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
+ ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1 reporter
+ ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
+ ((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
+ ((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
group.counter("C");
@@ -383,6 +411,9 @@ public class MetricRegistryTest extends TestLogger {
}
}
+ /**
+ * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier.
+ */
public static class TestReporter8 extends TestReporter {
char expectedDelimiter;
public static int numCorrectDelimitersForRegister = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index d6fc48c..5e6bbce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -15,12 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.runtime.metrics;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
+package org.apache.flink.runtime.metrics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -34,18 +30,25 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManager;
-
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
import org.junit.Assert;
import org.junit.Test;
-import scala.concurrent.duration.FiniteDuration;
-
import java.net.InetAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests for the behavior of the metric system on a task manager.
+ */
public class TaskManagerMetricsTest extends TestLogger {
/**
@@ -78,7 +81,7 @@ public class TaskManagerMetricsTest extends TestLogger {
final Configuration config = new Configuration();
final ResourceID tmResourceID = ResourceID.generate();
- TaskManagerServicesConfiguration taskManagerServicesConfiguration =
+ TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), false);
TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config);
@@ -87,7 +90,7 @@ public class TaskManagerMetricsTest extends TestLogger {
taskManagerServicesConfiguration, tmResourceID);
final MetricRegistry tmRegistry = taskManagerServices.getMetricRegistry();
-
+
// create the task manager
final Props tmProps = TaskManager.getTaskManagerProps(
TaskManager.class,
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 6e3d8f4..d36c469 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.dump;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -24,6 +25,7 @@ import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.metrics.util.TestingHistogram;
+
import org.junit.Assert;
import org.junit.Test;
@@ -43,7 +45,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
+/**
+ * Tests for the {@link MetricDumpSerialization}.
+ */
public class MetricDumpSerializerTest {
@Test
public void testNullGaugeHandling() throws IOException {
@@ -51,20 +55,20 @@ public class MetricDumpSerializerTest {
MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
-
+
gauges.put(new Gauge<Object>() {
@Override
public Object getValue() {
return null;
}
}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "g"));
-
+
MetricDumpSerialization.MetricSerializationResult output = serializer.serialize(
- Collections.<Counter, Tuple2<QueryScopeInfo,String>>emptyMap(),
+ Collections.<Counter, Tuple2<QueryScopeInfo, String>>emptyMap(),
gauges,
Collections.<Histogram, Tuple2<QueryScopeInfo, String>>emptyMap(),
Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
-
+
// no metrics should be serialized
Assert.assertEquals(0, output.serializedMetrics.length);
@@ -80,10 +84,10 @@ public class MetricDumpSerializerTest {
final ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(serializer.serialize(
- new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(),
- new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(),
- new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(),
- new HashMap<Meter, Tuple2<QueryScopeInfo,String>>()));
+ new HashMap<Counter, Tuple2<QueryScopeInfo, String>>(),
+ new HashMap<Gauge<?>, Tuple2<QueryScopeInfo, String>>(),
+ new HashMap<Histogram, Tuple2<QueryScopeInfo, String>>(),
+ new HashMap<Meter, Tuple2<QueryScopeInfo, String>>()));
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
index 3b65184..c7b9793 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.dump;
import org.junit.Test;
@@ -25,6 +26,9 @@ import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_H
import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the {@link MetricDump} classes.
+ */
public class MetricDumpTest {
@Test
public void testDumpedCounter() {
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 2243495..5c33ad6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -15,13 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.dump;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.testkit.TestActorRef;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -34,11 +30,20 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingHistogram;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the {@link MetricQueryService}.
+ */
public class MetricQueryServiceTest extends TestLogger {
@Test
public void testCreateDump() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
index 597e376..f4f9515 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.dump;
import org.junit.Test;
@@ -26,6 +27,9 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY
import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the {@link QueryScopeInfo} classes.
+ */
public class QueryScopeInfoTest {
@Test
public void testJobManagerMetricInfo() {
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 04e40ae..648ee47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.ConfigConstants;
@@ -28,11 +29,15 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.TestReporter;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the {@link AbstractMetricGroup}.
+ */
public class AbstractMetricGroupTest {
/**
* Verifies that no {@link NullPointerException} is thrown when {@link AbstractMetricGroup#getAllVariables()} is
@@ -54,7 +59,7 @@ public class AbstractMetricGroupTest {
}
};
assertTrue(group.getAllVariables().isEmpty());
-
+
registry.shutdown();
}
@@ -119,11 +124,15 @@ public class AbstractMetricGroupTest {
}
+ /**
+ * Reporter that verifies the scope caching behavior.
+ */
public static class TestReporter1 extends ScopeCheckingTestReporter {
@Override
public String filterCharacters(String input) {
return FILTER_B.filterCharacters(input);
}
+
@Override
public void checkScopes(Metric metric, String metricName, MetricGroup group) {
// the first call determines which filter is applied to all future calls; in this case no filter is used at all
@@ -141,6 +150,9 @@ public class AbstractMetricGroupTest {
}
}
+ /**
+ * Reporter that verifies the scope caching behavior.
+ */
public static class TestReporter2 extends ScopeCheckingTestReporter {
@Override
public String filterCharacters(String input) {
@@ -173,7 +185,7 @@ public class AbstractMetricGroupTest {
try {
TaskManagerMetricGroup group = new TaskManagerMetricGroup(testRegistry, "host", "id");
assertEquals("MetricReporters list should be empty", 0, testRegistry.getReporters().size());
-
+
// default delimiter should be used
assertEquals("A.B.X.D.1", group.getMetricIdentifier("1", FILTER_C));
// no caching should occur
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 7834755..03341a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
@@ -26,12 +27,16 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the {@link JobManagerMetricGroup}.
+ */
public class JobManagerGroupTest extends TestLogger {
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index d8bd57a..d734dfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -26,11 +26,15 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the {@link JobManagerJobMetricGroup}.
+ */
public class JobManagerJobGroupTest extends TestLogger {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index ace0236..56ce5fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.ConfigConstants;
@@ -33,8 +34,11 @@ import org.apache.flink.runtime.metrics.util.TestReporter;
import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the registration of groups and metrics on a {@link MetricGroup}.
+ */
public class MetricGroupRegistrationTest {
/**
* Verifies that group methods instantiate the correct metric with the given name.
@@ -59,7 +63,7 @@ public class MetricGroupRegistrationTest {
return null;
}
});
-
+
Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
assertEquals("gauge", TestReporter1.lastPassedName);
@@ -85,8 +89,11 @@ public class MetricGroupRegistrationTest {
registry.shutdown();
}
+ /**
+ * Reporter that exposes the last name and metric instance it was notified of.
+ */
public static class TestReporter1 extends TestReporter {
-
+
public static Metric lastPassedMetric;
public static String lastPassedName;
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 665abb1..7397c87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -24,22 +24,29 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.MetricRegistry;
-
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * Tests for the {@link MetricGroup}.
+ */
public class MetricGroupTest extends TestLogger {
private static final MetricRegistryConfiguration defaultMetricRegistryConfiguration = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
-
+
private MetricRegistry registry;
private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry();
@@ -63,7 +70,7 @@ public class MetricGroupTest extends TestLogger {
String groupName = "sometestname";
MetricGroup subgroup1 = group.addGroup(groupName);
MetricGroup subgroup2 = group.addGroup(groupName);
-
+
assertNotNull(subgroup1);
assertNotNull(subgroup2);
assertTrue(subgroup1 == subgroup2);
@@ -82,10 +89,12 @@ public class MetricGroupTest extends TestLogger {
group.counter("testcounter");
group.gauge("testgauge", new Gauge<Object>() {
@Override
- public Object getValue() { return null; }
+ public Object getValue() {
+ return null;
+ }
});
}
-
+
@Test
public void closedGroupCreatesClosedGroups() {
GenericMetricGroup group = new GenericMetricGroup(exceptionOnRegister,
@@ -94,7 +103,7 @@ public class MetricGroupTest extends TestLogger {
group.close();
assertTrue(group.isClosed());
-
+
AbstractMetricGroup subgroup = (AbstractMetricGroup) group.addGroup("test subgroup");
assertTrue(subgroup.isClosed());
}
@@ -104,7 +113,7 @@ public class MetricGroupTest extends TestLogger {
final String name = "abctestname";
GenericMetricGroup group = new GenericMetricGroup(
registry, new DummyAbstractMetricGroup(registry), "testgroup");
-
+
assertNotNull(group.counter(name));
assertNotNull(group.counter(name));
}
@@ -114,7 +123,7 @@ public class MetricGroupTest extends TestLogger {
final String name = "abctestname";
GenericMetricGroup group = new GenericMetricGroup(
registry, new DummyAbstractMetricGroup(registry), "testgroup");
-
+
assertNotNull(group.addGroup(name));
assertNotNull(group.counter(name));
}
@@ -143,11 +152,11 @@ public class MetricGroupTest extends TestLogger {
assertEquals(vid.toString(), info2.vertexID);
assertEquals(4, info2.subtaskIndex);
}
-
+
// ------------------------------------------------------------------------
-
+
private static class ExceptionOnRegisterRegistry extends MetricRegistry {
-
+
public ExceptionOnRegisterRegistry() {
super(defaultMetricRegistryConfiguration);
}
@@ -164,7 +173,7 @@ public class MetricGroupTest extends TestLogger {
}
// ------------------------------------------------------------------------
-
+
private static class DummyAbstractMetricGroup extends AbstractMetricGroup {
public DummyAbstractMetricGroup(MetricRegistry registry) {
@@ -182,7 +191,8 @@ public class MetricGroupTest extends TestLogger {
}
@Override
- protected void addMetric(String name, Metric metric) {}
+ protected void addMetric(String name, Metric metric) {
+ }
@Override
public MetricGroup addGroup(String name) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index af73c27..c232cf2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import java.util.Map;
@@ -34,6 +35,9 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+/**
+ * Tests for the {@link OperatorMetricGroup}.
+ */
public class OperatorGroupTest extends TestLogger {
@Test
@@ -81,7 +85,7 @@ public class OperatorGroupTest extends TestLogger {
JobID jid = new JobID();
AbstractID tid = new AbstractID();
AbstractID eid = new AbstractID();
-
+
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
index 1ff804a..f6a1277 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
@@ -15,20 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the {@link QueryScopeInfo} classes.
+ */
public class QueryScopeInfoTest {
@Test
public void testJobManagerQueryScopeInfo() {
QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
assertEquals("", info.scope);
-
+
info = info.copy("world");
assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
assertEquals("world", info.scope);
@@ -53,7 +58,7 @@ public class QueryScopeInfoTest {
assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
assertEquals("world", info.scope);
assertEquals("tmid", info.taskManagerID);
-
+
info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "hello");
assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
assertEquals("hello", info.scope);
http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index 564a518..bcf77de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -15,17 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+/**
+ * Tests for the {@link TaskIOMetricGroup}.
+ */
public class TaskIOMetricGroupTest {
@Test
public void testTaskIOMetricGroup() {
@@ -50,7 +55,7 @@ public class TaskIOMetricGroupTest {
taskIO.getNumBytesInLocalCounter().inc(100L);
taskIO.getNumBytesInRemoteCounter().inc(150L);
taskIO.getNumBytesOutCounter().inc(250L);
-
+
IOMetrics io = taskIO.createSnapshot();
assertEquals(32L, io.getNumRecordsIn());
assertEquals(64L, io.getNumRecordsOut());
[5/6] flink git commit: [FLINK-6781] Make statement fetch size
configurable in JDBCInputFormat.
Posted by ch...@apache.org.
[FLINK-6781] Make statement fetch size configurable in JDBCInputFormat.
This closes #4036.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5605107b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5605107b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5605107b
Branch: refs/heads/master
Commit: 5605107bd59f10fc9ca7d9ac53fa8d60ecdcfdbc
Parents: c041dd8
Author: Maximilian Bode <ma...@tngtech.com>
Authored: Wed May 31 18:46:55 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200
----------------------------------------------------------------------
.../flink/api/java/io/jdbc/JDBCInputFormat.java | 17 ++++++++
.../api/java/io/jdbc/JDBCInputFormatTest.java | 43 ++++++++++++++++++++
2 files changed, 60 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index 835fb23..b7ac744 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.io.jdbc;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
@@ -30,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,6 +115,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
private transient Connection dbConn;
private transient PreparedStatement statement;
private transient ResultSet resultSet;
+ private int fetchSize;
private boolean hasNext;
private Object[][] parameterValues;
@@ -141,6 +144,9 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
dbConn = DriverManager.getConnection(dbURL, username, password);
}
statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
+ if (fetchSize > 0) {
+ statement.setFetchSize(fetchSize);
+ }
} catch (SQLException se) {
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
} catch (ClassNotFoundException cnfe) {
@@ -312,6 +318,11 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
return new DefaultInputSplitAssigner(inputSplits);
}
+ @VisibleForTesting
+ PreparedStatement getStatement() {
+ return statement;
+ }
+
/**
* A builder used to set parameters to the output format's configuration in a fluent way.
* @return builder
@@ -378,6 +389,12 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
return this;
}
+ public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
+ Preconditions.checkArgument(fetchSize > 0, "Illegal value %s for fetchSize, has to be positive.", fetchSize);
+ format.fetchSize = fetchSize;
+ return this;
+ }
+
public JDBCInputFormat finish() {
if (format.username == null) {
LOG.info("Username was not supplied separately.");
http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index b1416ea..f7a86e5 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -30,7 +30,9 @@ import org.junit.Test;
import java.io.IOException;
import java.io.Serializable;
+import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.SQLException;
/**
* Tests for the {@link JDBCInputFormat}.
@@ -100,6 +102,47 @@ public class JDBCInputFormatTest extends JDBCTestBase {
.finish();
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidFetchSize() {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .setFetchSize(-7)
+ .finish();
+ }
+
+ @Test
+ public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+
+ Class.forName(DRIVER_CLASS);
+ final int defaultFetchSize = DriverManager.getConnection(DB_URL).createStatement().getFetchSize();
+
+ Assert.assertEquals(defaultFetchSize, jdbcInputFormat.getStatement().getFetchSize());
+ }
+
+ @Test
+ public void testFetchSizeCanBeConfigured() throws SQLException {
+ final int desiredFetchSize = 10_000;
+ jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(DRIVER_CLASS)
+ .setDBUrl(DB_URL)
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .setFetchSize(desiredFetchSize)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ Assert.assertEquals(desiredFetchSize, jdbcInputFormat.getStatement().getFetchSize());
+ }
+
@Test
public void testJDBCInputFormatWithoutParallelism() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()