You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/29 22:51:11 UTC

[4/7] flink git commit: [hotfix] [dist. coordination] Move metrics out of the Execution Graph

[hotfix] [dist. coordination] Move metrics out of the Execution Graph

ExecutionGraph-based metrics should be in their own package 'org.apache.flink.runtime.executiongraph.metrics'.
They are instantiated by whoever builds the execution graph, but not by the execution graph itself.
This separates concerns more elegantly.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c277ee17
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c277ee17
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c277ee17

Branch: refs/heads/master
Commit: c277ee17388c85c40d9c3956fe9ac524c3157130
Parents: 85f75a5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 16:54:17 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 17:11:49 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  45 +----
 .../executiongraph/ExecutionGraphBuilder.java   |   8 +-
 .../metrics/RestartTimeGauge.java               |  80 +++++++++
 .../executiongraph/metrics/package-info.java    |  23 +++
 ...ExecutionGraphCheckpointCoordinatorTest.java |   4 +-
 .../ExecutionGraphMetricsTest.java              | 173 ++++++-------------
 .../partitioner/RescalePartitionerTest.java     |   5 +-
 7 files changed, 169 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0564fd0..06b2f9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -29,9 +29,6 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -126,8 +123,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** The log object used for debugging. */
 	static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
 
-	static final String RESTARTING_TIME_METRIC_NAME = "restartingTime";
-
 	// --------------------------------------------------------------------------------------------
 
 	/** The lock used to secure all access to mutable fields, especially the tracking of progress
@@ -258,9 +253,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList(),
 			slotProvider,
-			ExecutionGraph.class.getClassLoader(),
-			new UnregisteredMetricsGroup()
-		);
+			ExecutionGraph.class.getClassLoader());
 	}
 
 	public ExecutionGraph(
@@ -275,8 +268,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
 			SlotProvider slotProvider,
-			ClassLoader userClassLoader,
-			MetricGroup metricGroup) throws IOException {
+			ClassLoader userClassLoader) throws IOException {
 
 		checkNotNull(futureExecutor);
 		checkNotNull(jobId);
@@ -315,9 +307,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		this.scheduleAllocationTimeout = checkNotNull(timeout);
 
 		this.restartStrategy = restartStrategy;
-
-		metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
-
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
 	}
 
@@ -1449,36 +1438,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	/**
-	 * Gauge which returns the last restarting time. Restarting time is the time between
-	 * JobStatus.RESTARTING and JobStatus.RUNNING or a terminal state if JobStatus.RUNNING was not
-	 * reached. If the job has not yet reached either of these states, then the time is measured
-	 * since reaching JobStatus.RESTARTING. If it is still the initial job execution, then the
-	 * gauge will return 0.
-	 */
-	private class RestartTimeGauge implements Gauge<Long> {
-
-		@Override
-		public Long getValue() {
-			long restartingTimestamp = stateTimestamps[JobStatus.RESTARTING.ordinal()];
-
-			if (restartingTimestamp <= 0) {
-				// we haven't yet restarted our job
-				return 0L;
-			} else if (stateTimestamps[JobStatus.RUNNING.ordinal()] >= restartingTimestamp) {
-				// we have transitioned to RUNNING since the last restart
-				return stateTimestamps[JobStatus.RUNNING.ordinal()] - restartingTimestamp;
-			} else if (state.isTerminalState()) {
-				// since the last restart we've switched to a terminal state without touching
-				// the RUNNING state (e.g. failing from RESTARTING)
-				return stateTimestamps[state.ordinal()] - restartingTimestamp;
-			} else {
-				// we're still somwhere between RESTARTING and RUNNING
-				return System.currentTimeMillis() - restartingTimestamp;
-			}
-		}
-	}
-
 	@Override
 	public ArchivedExecutionGraph archive() {
 		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8471178..494b7a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -102,8 +103,7 @@ public class ExecutionGraphBuilder {
 						jobGraph.getUserJarBlobKeys(),
 						jobGraph.getClasspaths(),
 						slotProvider,
-						classLoader,
-						metrics);
+						classLoader);
 		} catch (IOException e) {
 			throw new JobException("Could not create the execution graph.", e);
 		}
@@ -250,6 +250,10 @@ public class ExecutionGraphBuilder {
 					checkpointStatsTracker);
 		}
 
+		// create all the metrics for the Execution Graph
+
+		metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
+
 		return executionGraph;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
new file mode 100644
index 0000000..e0a22e3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge which returns the last restarting time.
+ * 
+ * <p>Restarting time is the time between {@link JobStatus#RESTARTING} and {@link JobStatus#RUNNING},
+ * or a terminal state if {@link JobStatus#RUNNING} was not reached.
+ * 
+ * <p>If the job has not yet reached either of these states, then the time is measured since reaching
+ * {@link JobStatus#RESTARTING}. If it is still the initial job execution, then the gauge will return 0.
+ */
+public class RestartTimeGauge implements Gauge<Long> {
+
+	public static final String METRIC_NAME = "restartingTime";
+
+	// ------------------------------------------------------------------------
+
+	private final ExecutionGraph eg;
+
+	public RestartTimeGauge(ExecutionGraph executionGraph) {
+		this.eg = checkNotNull(executionGraph);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Long getValue() {
+		final JobStatus status = eg.getState();
+
+		final long restartingTimestamp = eg.getStatusTimestamp(JobStatus.RESTARTING);
+
+		final long switchToRunningTimestamp;
+		final long lastRestartTime;
+
+		if (restartingTimestamp <= 0) {
+			// we haven't yet restarted our job
+			return 0L;
+		}
+		else if ((switchToRunningTimestamp = eg.getStatusTimestamp(JobStatus.RUNNING)) >= restartingTimestamp) {
+			// we have transitioned to RUNNING since the last restart
+			lastRestartTime = switchToRunningTimestamp - restartingTimestamp;
+		}
+		else if (status.isTerminalState()) {
+			// since the last restart we've switched to a terminal state without touching
+			// the RUNNING state (e.g. failing from RESTARTING)
+			lastRestartTime = eg.getStatusTimestamp(status) - restartingTimestamp;
+		}
+		else {
+			// we're still somewhere between RESTARTING and RUNNING
+			lastRestartTime  = System.currentTimeMillis() - restartingTimestamp;
+		}
+
+		// we guard this with 'Math.max' to avoid negative timestamps when clocks re-sync 
+		return Math.max(lastRestartTime, 0);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/package-info.java
new file mode 100644
index 0000000..8b9d205
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes implementing various metrics for the job execution,
+ * based on the {@link org.apache.flink.runtime.executiongraph.ExecutionGraph Execution Graph}.
+ */
+package org.apache.flink.runtime.executiongraph.metrics;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 81162b6..0ab031e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -95,8 +94,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
-			ClassLoader.getSystemClassLoader(),
-			new UnregisteredMetricsGroup());
+			ClassLoader.getSystemClassLoader());
 
 		executionGraph.enableCheckpointing(
 				100,

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 5496e35..97127c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -20,13 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -35,13 +29,9 @@ import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -49,9 +39,11 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -63,17 +55,16 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ExecutionGraphMetricsTest extends TestLogger {
 
@@ -86,32 +77,14 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 		try {
 			// setup execution graph with mocked scheduling logic
 			int parallelism = 1;
-	
+
 			JobVertex jobVertex = new JobVertex("TestVertex");
 			jobVertex.setParallelism(parallelism);
 			jobVertex.setInvokableClass(NoOpInvokable.class);
 			JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
-	
-			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
-			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
-	
+
 			Configuration jobConfig = new Configuration();
-	
 			Time timeout = Time.seconds(10L);
-	
-			MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-	
-			assertTrue(metricRegistry.getReporters().size() == 1);
-	
-			MetricReporter reporter = metricRegistry.getReporters().get(0);
-	
-			assertTrue(reporter instanceof TestingReporter);
-	
-			TestingReporter testingReporter = (TestingReporter) reporter;
-	
-			MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost");
-	
 			Scheduler scheduler = mock(Scheduler.class);
 
 			ResourceID taskManagerId = ResourceID.generate();
@@ -163,163 +136,127 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 				Collections.<BlobKey>emptyList(),
 				Collections.<URL>emptyList(),
 				scheduler,
-				getClass().getClassLoader(),
-				metricGroup);
-	
-			// get restarting time metric
-			Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
-	
-			assertNotNull(metric);
-			assertTrue(metric instanceof Gauge);
-	
-			@SuppressWarnings("unchecked")
-			Gauge<Long> restartingTime = (Gauge<Long>) metric;
-	
+				getClass().getClassLoader());
+
+			RestartTimeGauge restartingTime = new RestartTimeGauge(executionGraph);
+
 			// check that the restarting time is 0 since it's the initial start
-			assertTrue(0L == restartingTime.getValue());
-	
+			assertEquals(0L, restartingTime.getValue().longValue());
+
 			executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-	
+
 			// start execution
-		executionGraph.scheduleForExecution();
+			executionGraph.scheduleForExecution();
+			assertEquals(0L, restartingTime.getValue().longValue());
 
-			assertTrue(0L == restartingTime.getValue());
-	
 			List<ExecutionAttemptID> executionIDs = new ArrayList<>();
-	
+
 			for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
 				executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
 			}
-	
+
 			// tell execution graph that the tasks are in state running --> job status switches to state running
 			for (ExecutionAttemptID executionID : executionIDs) {
 				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
 			}
-	
+
 			assertEquals(JobStatus.RUNNING, executionGraph.getState());
-	
-			assertTrue(0L == restartingTime.getValue());
-	
+			assertEquals(0L, restartingTime.getValue().longValue());
+
 			// fail the job so that it goes into state restarting
 			for (ExecutionAttemptID executionID : executionIDs) {
 				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
 			}
-	
+
 			assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-	
+
 			long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-	
+
 			// wait some time so that the restarting time gauge shows a value different from 0
 			Thread.sleep(50);
-	
+
 			long previousRestartingTime = restartingTime.getValue();
-	
+
 			// check that the restarting time is monotonically increasing
 			for (int i = 0; i < 10; i++) {
 				long currentRestartingTime = restartingTime.getValue();
-	
+
 				assertTrue(currentRestartingTime >= previousRestartingTime);
 				previousRestartingTime = currentRestartingTime;
 			}
-	
+
 			// check that we have measured some restarting time
 			assertTrue(previousRestartingTime > 0);
-	
+
 			// restart job
 			testingRestartStrategy.restartExecutionGraph();
-	
+
 			executionIDs.clear();
-	
+
 			for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
 				executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
 			}
-	
+
 			for (ExecutionAttemptID executionID : executionIDs) {
 				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
 			}
-	
+
 			assertEquals(JobStatus.RUNNING, executionGraph.getState());
-	
+
 			assertTrue(firstRestartingTimestamp != 0);
-	
+
 			previousRestartingTime = restartingTime.getValue();
-	
+
 			// check that the restarting time does not increase after we've reached the running state
 			for (int i = 0; i < 10; i++) {
 				long currentRestartingTime = restartingTime.getValue();
-	
+
 				assertTrue(currentRestartingTime == previousRestartingTime);
 				previousRestartingTime = currentRestartingTime;
 			}
-	
+
 			// fail job again
 			for (ExecutionAttemptID executionID : executionIDs) {
 				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
 			}
-	
+
 			assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-	
+
 			long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-	
+
 			assertTrue(firstRestartingTimestamp != secondRestartingTimestamp);
-	
+
 			Thread.sleep(50);
-	
+
 			previousRestartingTime = restartingTime.getValue();
-	
+
 			// check that the restarting time is increasing again
 			for (int i = 0; i < 10; i++) {
 				long currentRestartingTime = restartingTime.getValue();
-	
+
 				assertTrue(currentRestartingTime >= previousRestartingTime);
 				previousRestartingTime = currentRestartingTime;
 			}
-	
+
 			assertTrue(previousRestartingTime > 0);
-	
+
 			// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
 			// for this to work, we have to use a SuppressRestartException
 			executionGraph.fail(new SuppressRestartsException(new Exception()));
-	
+
 			assertEquals(JobStatus.FAILED, executionGraph.getState());
-	
+
 			previousRestartingTime = restartingTime.getValue();
-	
+
 			for (int i = 0; i < 10; i++) {
 				long currentRestartingTime = restartingTime.getValue();
-	
+
 				assertTrue(currentRestartingTime == previousRestartingTime);
 				previousRestartingTime = currentRestartingTime;
 			}
 		} finally {
 			executor.shutdownNow();
 		}
-
-	}
-
-	public static class TestingReporter implements MetricReporter {
-
-		private final Map<String, Metric> metrics = new HashMap<>();
-
-		@Override
-		public void open(MetricConfig config) {}
-
-		@Override
-		public void close() {}
-
-		@Override
-		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
-			metrics.put(metricName, metric);
-		}
-
-		@Override
-		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
-			metrics.remove(metricName);
-		}
-
-		Metric getMetric(String metricName) {
-			return metrics.get(metricName);
-		}
 	}
 
 	static class TestingRestartStrategy implements RestartStrategy {

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 43fe169..d72c37b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -44,6 +43,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Before;
 import org.junit.Test;
 
@@ -150,8 +150,7 @@ public class RescalePartitionerTest extends TestLogger {
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
-			ExecutionGraph.class.getClassLoader(),
-			new UnregisteredMetricsGroup());
+			ExecutionGraph.class.getClassLoader());
 		try {
 			eg.attachJobGraph(jobVertices);
 		}