You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/24 16:09:37 UTC

[4/6] flink git commit: [FLINK-6046] Combine ExecutionGraph parameters into JobInformation

[FLINK-6046] Combine ExecutionGraph parameters into JobInformation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/315badcf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/315badcf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/315badcf

Branch: refs/heads/master
Commit: 315badcff6fb3f252c62a21b733a3e4b5bb065b1
Parents: 7a9df74
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 20 19:04:32 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 24 18:08:31 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 36 +++-----
 .../executiongraph/ExecutionGraphBuilder.java   | 21 ++---
 ...ExecutionGraphCheckpointCoordinatorTest.java | 12 +--
 .../executiongraph/DummyJobInformation.java     | 50 +++++++++++
 .../executiongraph/FailoverRegionTest.java      | 87 ++++++++------------
 .../executiongraph/GlobalModVersionTest.java    | 34 ++++----
 .../IndividualRestartsConcurrencyTest.java      | 31 +++----
 .../PipelinedRegionFailoverConcurrencyTest.java | 27 +++---
 .../RestartPipelinedRegionStrategyTest.java     | 49 ++++-------
 .../partitioner/RescalePartitionerTest.java     | 13 +--
 10 files changed, 164 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c0004f9..fe7770b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -316,6 +316,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/**
 	 * This constructor is for tests only, because it does not include class loading information.
 	 */
+	@VisibleForTesting
 	ExecutionGraph(
 			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
@@ -328,17 +329,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			SlotProvider slotProvider,
 			@Nullable BlobServer blobServer) {
 		this(
+			new JobInformation(
+				jobId,
+				jobName,
+				serializedConfig,
+				jobConfig,
+				Collections.emptyList(),
+				Collections.emptyList()),
 			futureExecutor,
 			ioExecutor,
-			jobId,
-			jobName,
-			jobConfig,
-			serializedConfig,
 			timeout,
 			restartStrategy,
 			new RestartAllStrategy.Factory(),
-			Collections.emptyList(),
-			Collections.emptyList(),
 			slotProvider,
 			ExecutionGraph.class.getClassLoader(),
 			blobServer
@@ -346,33 +348,19 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	public ExecutionGraph(
+			JobInformation jobInformation,
 			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
-			JobID jobId,
-			String jobName,
-			Configuration jobConfig,
-			SerializedValue<ExecutionConfig> serializedConfig,
 			Time timeout,
 			RestartStrategy restartStrategy,
 			FailoverStrategy.Factory failoverStrategyFactory,
-			List<PermanentBlobKey> requiredJarFiles,
-			List<URL> requiredClasspaths,
 			SlotProvider slotProvider,
 			ClassLoader userClassLoader,
 			@Nullable BlobServer blobServer) {
 
 		checkNotNull(futureExecutor);
-		checkNotNull(jobId);
-		checkNotNull(jobName);
-		checkNotNull(jobConfig);
-
-		this.jobInformation = new JobInformation(
-			jobId,
-			jobName,
-			serializedConfig,
-			jobConfig,
-			requiredJarFiles,
-			requiredClasspaths);
+
+		this.jobInformation = Preconditions.checkNotNull(jobInformation);
 
 		// serialize the job information to do the serialisation work only once
 		try {
@@ -405,7 +393,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		this.scheduleAllocationTimeout = checkNotNull(timeout);
 
 		this.restartStrategy = restartStrategy;
-		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
+		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
 
 		this.verticesFinished = new AtomicInteger();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 3b72505..42fbfc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -106,20 +106,21 @@ public class ExecutionGraphBuilder {
 		// create a new execution graph, if none exists so far
 		final ExecutionGraph executionGraph = (prior != null) ? prior :
 				new ExecutionGraph(
-						futureExecutor,
-						ioExecutor,
+					new JobInformation(
 						jobId,
 						jobName,
-						jobGraph.getJobConfiguration(),
 						jobGraph.getSerializedExecutionConfig(),
-						timeout,
-						restartStrategy,
-						failoverStrategy,
+						jobGraph.getJobConfiguration(),
 						jobGraph.getUserJarBlobKeys(),
-						jobGraph.getClasspaths(),
-						slotProvider,
-						classLoader,
-						blobServer);
+						jobGraph.getClasspaths()),
+					futureExecutor,
+					ioExecutor,
+					timeout,
+					restartStrategy,
+					failoverStrategy,
+					slotProvider,
+					classLoader,
+					blobServer);
 
 		// set the basic properties
 

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index b89ed5d..1489f1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -31,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
@@ -81,17 +78,12 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			CheckpointIDCounter counter,
 			CompletedCheckpointStore store) throws Exception {
 		ExecutionGraph executionGraph = new ExecutionGraph(
+			new DummyJobInformation(),
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			new JobID(),
-			"test",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
 			Time.days(1L),
 			new NoRestartStrategy(),
 			new RestartAllStrategy.Factory(),
-			Collections.emptyList(),
-			Collections.emptyList(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ClassLoader.getSystemClassLoader(),
 			null);

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DummyJobInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DummyJobInformation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DummyJobInformation.java
new file mode 100644
index 0000000..99105bd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DummyJobInformation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+/**
+ * Simple dummy job information for testing purposes.
+ */
+public class DummyJobInformation extends JobInformation {
+
+	private static final long serialVersionUID = 6611358237464645058L;
+
+	public DummyJobInformation(JobID jobId, String jobName) throws IOException {
+		super(
+			jobId,
+			jobName,
+			new SerializedValue<>(new ExecutionConfig()),
+			new Configuration(),
+			Collections.emptyList(),
+			Collections.emptyList());
+	}
+
+	public DummyJobInformation() throws IOException {
+		this(new JobID(), "Test Job " + UUID.randomUUID());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index d631de9..1f20e12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -43,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
@@ -51,7 +48,6 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -129,20 +125,17 @@ public class FailoverRegionTest extends TestLogger {
 		List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
+			new DummyJobInformation(
 				jobId,
-				jobName,
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new InfiniteDelayRestartStrategy(10),
-				new FailoverPipelinedRegionWithDirectExecutor(),
-				Collections.emptyList(),
-				Collections.emptyList(),
-				slotProvider,
-				ExecutionGraph.class.getClassLoader(),
-				null);
+				jobName),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			AkkaUtils.getDefaultTimeout(),
+			new InfiniteDelayRestartStrategy(10),
+			new FailoverPipelinedRegionWithDirectExecutor(),
+			slotProvider,
+			ExecutionGraph.class.getClassLoader(),
+			null);
 
 		eg.attachJobGraph(ordered);
 
@@ -233,7 +226,6 @@ public class FailoverRegionTest extends TestLogger {
 
 		final JobID jobId = new JobID();
 		final String jobName = "Test Job Sample Name";
-		final Configuration cfg = new Configuration();
 
 		JobVertex v1 = new JobVertex("vertex1");
 		JobVertex v2 = new JobVertex("vertex2");
@@ -257,17 +249,14 @@ public class FailoverRegionTest extends TestLogger {
 		List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
 		ExecutionGraph eg = new ExecutionGraph(
+				new DummyJobInformation(
+					jobId,
+					jobName),
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
-				jobId,
-				jobName,
-				cfg,
-				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
 				new InfiniteDelayRestartStrategy(10),
 				new RestartPipelinedRegionStrategy.Factory(),
-				Collections.emptyList(),
-				Collections.emptyList(),
 				scheduler,
 				ExecutionGraph.class.getClassLoader(),
 				null);
@@ -319,7 +308,6 @@ public class FailoverRegionTest extends TestLogger {
 
 		final JobID jobId = new JobID();
 		final String jobName = "Test Job Sample Name";
-		final Configuration cfg = new Configuration();
 
 		JobVertex v1 = new JobVertex("vertex1");
 		JobVertex v2 = new JobVertex("vertex2");
@@ -335,20 +323,17 @@ public class FailoverRegionTest extends TestLogger {
 		List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
+			new DummyJobInformation(
 				jobId,
-				jobName,
-				cfg,
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new InfiniteDelayRestartStrategy(10),
-				new FailoverPipelinedRegionWithDirectExecutor(),
-				Collections.emptyList(),
-				Collections.emptyList(),
-				scheduler,
-				ExecutionGraph.class.getClassLoader(),
-				null);
+				jobName),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			AkkaUtils.getDefaultTimeout(),
+			new InfiniteDelayRestartStrategy(10),
+			new FailoverPipelinedRegionWithDirectExecutor(),
+			scheduler,
+			ExecutionGraph.class.getClassLoader(),
+			null);
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -429,7 +414,6 @@ public class FailoverRegionTest extends TestLogger {
 
 		final JobID jobId = new JobID();
 		final String jobName = "Test Job Sample Name";
-		final Configuration cfg = new Configuration();
 
 		JobVertex v1 = new JobVertex("vertex1");
 		JobVertex v2 = new JobVertex("vertex2");
@@ -449,20 +433,17 @@ public class FailoverRegionTest extends TestLogger {
 		List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
+			new DummyJobInformation(
 				jobId,
-				jobName,
-				cfg,
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				restartStrategy,
-				new FailoverPipelinedRegionWithDirectExecutor(),
-				Collections.emptyList(),
-				Collections.emptyList(),
-				scheduler,
-				ExecutionGraph.class.getClassLoader(),
-				null);
+				jobName),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			AkkaUtils.getDefaultTimeout(),
+			restartStrategy,
+			new FailoverPipelinedRegionWithDirectExecutor(),
+			scheduler,
+			ExecutionGraph.class.getClassLoader(),
+			null);
 		try {
 			eg.attachJobGraph(ordered);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index 986fb39..b1d6692 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -32,18 +30,17 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.Random;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
-
 import static org.junit.Assert.assertEquals;
-
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class GlobalModVersionTest {
 
@@ -163,20 +160,17 @@ public class GlobalModVersionTest {
 
 		// build a simple execution graph with on job vertex, parallelism 2
 		final ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
+			new DummyJobInformation(
 				jid,
-				"test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				Time.seconds(10),
-				new InfiniteDelayRestartStrategy(),
-				new CustomStrategy(failoverStrategy),
-				Collections.emptyList(),
-				Collections.emptyList(),
-				slotProvider,
-				getClass().getClassLoader(),
-				null);
+				"test job"),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			Time.seconds(10),
+			new InfiniteDelayRestartStrategy(),
+			new CustomStrategy(failoverStrategy),
+			slotProvider,
+			getClass().getClassLoader(),
+			null);
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 9d7cd90..9d924c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -37,17 +35,15 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * These tests make sure that global failover (restart all) always takes precedence over
@@ -287,20 +283,17 @@ public class IndividualRestartsConcurrencyTest {
 
 		// build a simple execution graph with on job vertex, parallelism 2
 		final ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
+			new DummyJobInformation(
 				jid,
-				"test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				Time.seconds(10),
-				restartStrategy,
-				failoverStrategy,
-				Collections.emptyList(),
-				Collections.emptyList(),
-				slotProvider,
-				getClass().getClassLoader(),
-				null);
+				"test job"),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			Time.seconds(10),
+			restartStrategy,
+			failoverStrategy,
+			slotProvider,
+			getClass().getClassLoader(),
+			null);
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index a0351fa..93c163b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -37,11 +35,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
@@ -309,20 +305,17 @@ public class PipelinedRegionFailoverConcurrencyTest {
 
 		// build a simple execution graph with on job vertex, parallelism 2
 		final ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
+			new DummyJobInformation(
 				jid,
-				"test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				Time.seconds(10),
-				restartStrategy,
-				failoverStrategy,
-				Collections.emptyList(),
-				Collections.emptyList(),
-				slotProvider,
-				getClass().getClassLoader(),
-				null);
+				"test job"),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			Time.seconds(10),
+			restartStrategy,
+			failoverStrategy,
+			slotProvider,
+			getClass().getClassLoader(),
+			null);
 
 		JobVertex jv = new JobVertex("test vertex");
 		jv.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index ecbdc46..edb39e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -32,12 +30,11 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -64,8 +61,7 @@ public class RestartPipelinedRegionStrategyTest {
 		
 		final JobID jobId = new JobID();
 		final String jobName = "Test Job Sample Name";
-		final Configuration cfg = new Configuration();
-		
+
 		JobVertex v1 = new JobVertex("vertex1");
 		JobVertex v2 = new JobVertex("vertex2");
 		JobVertex v3 = new JobVertex("vertex3");
@@ -94,17 +90,14 @@ public class RestartPipelinedRegionStrategyTest {
 
         Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
 		ExecutionGraph eg = new ExecutionGraph(
+			new DummyJobInformation(
+				jobId,
+				jobName),
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            Collections.emptyList(),
-            Collections.emptyList(),
             scheduler,
             ExecutionGraph.class.getClassLoader(),
 			null);
@@ -151,7 +144,6 @@ public class RestartPipelinedRegionStrategyTest {
 	public void testMultipleFailoverRegions() throws Exception {
 		final JobID jobId = new JobID();
 		final String jobName = "Test Job Sample Name";
-		final Configuration cfg = new Configuration();
 
         JobVertex v1 = new JobVertex("vertex1");
         JobVertex v2 = new JobVertex("vertex2");
@@ -180,17 +172,14 @@ public class RestartPipelinedRegionStrategyTest {
 
         Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
 		ExecutionGraph eg = new ExecutionGraph(
+			new DummyJobInformation(
+				jobId,
+				jobName),
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            Collections.emptyList(),
-            Collections.emptyList(),
             scheduler,
             ExecutionGraph.class.getClassLoader(),
 			null);
@@ -241,7 +230,6 @@ public class RestartPipelinedRegionStrategyTest {
 	public void testSingleRegionWithMixedInput() throws Exception {
 		final JobID jobId = new JobID();
 		final String jobName = "Test Job Sample Name";
-		final Configuration cfg = new Configuration();
 
         JobVertex v1 = new JobVertex("vertex1");
         JobVertex v2 = new JobVertex("vertex2");
@@ -271,17 +259,14 @@ public class RestartPipelinedRegionStrategyTest {
 
         Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
 		ExecutionGraph eg = new ExecutionGraph(
+			new DummyJobInformation(
+				jobId,
+				jobName),
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            Collections.emptyList(),
-            Collections.emptyList(),
             scheduler,
             ExecutionGraph.class.getClassLoader(),
 			null);
@@ -327,7 +312,6 @@ public class RestartPipelinedRegionStrategyTest {
 	public void testMultiRegionNotAllToAll() throws Exception {
 		final JobID jobId = new JobID();
 		final String jobName = "Test Job Sample Name";
-		final Configuration cfg = new Configuration();
 
         JobVertex v1 = new JobVertex("vertex1");
         JobVertex v2 = new JobVertex("vertex2");
@@ -353,17 +337,14 @@ public class RestartPipelinedRegionStrategyTest {
 
         Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
         ExecutionGraph eg = new ExecutionGraph(
+        	new DummyJobInformation(
+        		jobId,
+				jobName),
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            Collections.emptyList(),
-            Collections.emptyList(),
             scheduler,
             ExecutionGraph.class.getClassLoader(),
 			null);

http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 70b030c..e72ddf7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.partitioner;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -25,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -41,13 +41,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -143,17 +141,14 @@ public class RescalePartitionerTest extends TestLogger {
 		assertEquals(2, sinkVertex.getParallelism());
 
 		ExecutionGraph eg = new ExecutionGraph(
+			new DummyJobInformation(
+				jobId,
+				jobName),
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			jobId,
-			jobName,
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
 			new RestartAllStrategy.Factory(),
-			new ArrayList<>(),
-			new ArrayList<>(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ExecutionGraph.class.getClassLoader(),
 			null);