You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/24 16:09:37 UTC
[4/6] flink git commit: [FLINK-6046] Combine ExecutionGraph
parameters into JobInformation
[FLINK-6046] Combine ExecutionGraph parameters into JobInformation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/315badcf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/315badcf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/315badcf
Branch: refs/heads/master
Commit: 315badcff6fb3f252c62a21b733a3e4b5bb065b1
Parents: 7a9df74
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 20 19:04:32 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 24 18:08:31 2017 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 36 +++-----
.../executiongraph/ExecutionGraphBuilder.java | 21 ++---
...ExecutionGraphCheckpointCoordinatorTest.java | 12 +--
.../executiongraph/DummyJobInformation.java | 50 +++++++++++
.../executiongraph/FailoverRegionTest.java | 87 ++++++++------------
.../executiongraph/GlobalModVersionTest.java | 34 ++++----
.../IndividualRestartsConcurrencyTest.java | 31 +++----
.../PipelinedRegionFailoverConcurrencyTest.java | 27 +++---
.../RestartPipelinedRegionStrategyTest.java | 49 ++++-------
.../partitioner/RescalePartitionerTest.java | 13 +--
10 files changed, 164 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c0004f9..fe7770b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -316,6 +316,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/**
* This constructor is for tests only, because it does not include class loading information.
*/
+ @VisibleForTesting
ExecutionGraph(
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
@@ -328,17 +329,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
SlotProvider slotProvider,
@Nullable BlobServer blobServer) {
this(
+ new JobInformation(
+ jobId,
+ jobName,
+ serializedConfig,
+ jobConfig,
+ Collections.emptyList(),
+ Collections.emptyList()),
futureExecutor,
ioExecutor,
- jobId,
- jobName,
- jobConfig,
- serializedConfig,
timeout,
restartStrategy,
new RestartAllStrategy.Factory(),
- Collections.emptyList(),
- Collections.emptyList(),
slotProvider,
ExecutionGraph.class.getClassLoader(),
blobServer
@@ -346,33 +348,19 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
public ExecutionGraph(
+ JobInformation jobInformation,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
- JobID jobId,
- String jobName,
- Configuration jobConfig,
- SerializedValue<ExecutionConfig> serializedConfig,
Time timeout,
RestartStrategy restartStrategy,
FailoverStrategy.Factory failoverStrategyFactory,
- List<PermanentBlobKey> requiredJarFiles,
- List<URL> requiredClasspaths,
SlotProvider slotProvider,
ClassLoader userClassLoader,
@Nullable BlobServer blobServer) {
checkNotNull(futureExecutor);
- checkNotNull(jobId);
- checkNotNull(jobName);
- checkNotNull(jobConfig);
-
- this.jobInformation = new JobInformation(
- jobId,
- jobName,
- serializedConfig,
- jobConfig,
- requiredJarFiles,
- requiredClasspaths);
+
+ this.jobInformation = Preconditions.checkNotNull(jobInformation);
// serialize the job information to do the serialisation work only once
try {
@@ -405,7 +393,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
this.scheduleAllocationTimeout = checkNotNull(timeout);
this.restartStrategy = restartStrategy;
- this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
+ this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
this.verticesFinished = new AtomicInteger();
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 3b72505..42fbfc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -106,20 +106,21 @@ public class ExecutionGraphBuilder {
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph = (prior != null) ? prior :
new ExecutionGraph(
- futureExecutor,
- ioExecutor,
+ new JobInformation(
jobId,
jobName,
- jobGraph.getJobConfiguration(),
jobGraph.getSerializedExecutionConfig(),
- timeout,
- restartStrategy,
- failoverStrategy,
+ jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
- jobGraph.getClasspaths(),
- slotProvider,
- classLoader,
- blobServer);
+ jobGraph.getClasspaths()),
+ futureExecutor,
+ ioExecutor,
+ timeout,
+ restartStrategy,
+ failoverStrategy,
+ slotProvider,
+ classLoader,
+ blobServer);
// set the basic properties
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index b89ed5d..1489f1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -18,10 +18,8 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -31,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
import org.junit.Test;
@@ -81,17 +78,12 @@ public class ExecutionGraphCheckpointCoordinatorTest {
CheckpointIDCounter counter,
CompletedCheckpointStore store) throws Exception {
ExecutionGraph executionGraph = new ExecutionGraph(
+ new DummyJobInformation(),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new JobID(),
- "test",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
Time.days(1L),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
- Collections.emptyList(),
- Collections.emptyList(),
new Scheduler(TestingUtils.defaultExecutionContext()),
ClassLoader.getSystemClassLoader(),
null);
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DummyJobInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DummyJobInformation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DummyJobInformation.java
new file mode 100644
index 0000000..99105bd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DummyJobInformation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+/**
+ * Simple dummy job information for testing purposes.
+ */
+public class DummyJobInformation extends JobInformation {
+
+ private static final long serialVersionUID = 6611358237464645058L;
+
+ public DummyJobInformation(JobID jobId, String jobName) throws IOException {
+ super(
+ jobId,
+ jobName,
+ new SerializedValue<>(new ExecutionConfig()),
+ new Configuration(),
+ Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ public DummyJobInformation() throws IOException {
+ this(new JobID(), "Test Job " + UUID.randomUUID());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index d631de9..1f20e12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -18,11 +18,9 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -43,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Ignore;
@@ -51,7 +48,6 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -129,20 +125,17 @@ public class FailoverRegionTest extends TestLogger {
List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ new DummyJobInformation(
jobId,
- jobName,
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new InfiniteDelayRestartStrategy(10),
- new FailoverPipelinedRegionWithDirectExecutor(),
- Collections.emptyList(),
- Collections.emptyList(),
- slotProvider,
- ExecutionGraph.class.getClassLoader(),
- null);
+ jobName),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ AkkaUtils.getDefaultTimeout(),
+ new InfiniteDelayRestartStrategy(10),
+ new FailoverPipelinedRegionWithDirectExecutor(),
+ slotProvider,
+ ExecutionGraph.class.getClassLoader(),
+ null);
eg.attachJobGraph(ordered);
@@ -233,7 +226,6 @@ public class FailoverRegionTest extends TestLogger {
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
- final Configuration cfg = new Configuration();
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
@@ -257,17 +249,14 @@ public class FailoverRegionTest extends TestLogger {
List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
ExecutionGraph eg = new ExecutionGraph(
+ new DummyJobInformation(
+ jobId,
+ jobName),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new InfiniteDelayRestartStrategy(10),
new RestartPipelinedRegionStrategy.Factory(),
- Collections.emptyList(),
- Collections.emptyList(),
scheduler,
ExecutionGraph.class.getClassLoader(),
null);
@@ -319,7 +308,6 @@ public class FailoverRegionTest extends TestLogger {
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
- final Configuration cfg = new Configuration();
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
@@ -335,20 +323,17 @@ public class FailoverRegionTest extends TestLogger {
List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ new DummyJobInformation(
jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new InfiniteDelayRestartStrategy(10),
- new FailoverPipelinedRegionWithDirectExecutor(),
- Collections.emptyList(),
- Collections.emptyList(),
- scheduler,
- ExecutionGraph.class.getClassLoader(),
- null);
+ jobName),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ AkkaUtils.getDefaultTimeout(),
+ new InfiniteDelayRestartStrategy(10),
+ new FailoverPipelinedRegionWithDirectExecutor(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader(),
+ null);
try {
eg.attachJobGraph(ordered);
}
@@ -429,7 +414,6 @@ public class FailoverRegionTest extends TestLogger {
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
- final Configuration cfg = new Configuration();
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
@@ -449,20 +433,17 @@ public class FailoverRegionTest extends TestLogger {
List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ new DummyJobInformation(
jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- restartStrategy,
- new FailoverPipelinedRegionWithDirectExecutor(),
- Collections.emptyList(),
- Collections.emptyList(),
- scheduler,
- ExecutionGraph.class.getClassLoader(),
- null);
+ jobName),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ AkkaUtils.getDefaultTimeout(),
+ restartStrategy,
+ new FailoverPipelinedRegionWithDirectExecutor(),
+ scheduler,
+ ExecutionGraph.class.getClassLoader(),
+ null);
try {
eg.attachJobGraph(ordered);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index 986fb39..b1d6692 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -18,10 +18,8 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -32,18 +30,17 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.SerializedValue;
import org.junit.Test;
-import java.util.Collections;
import java.util.Random;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
-
import static org.junit.Assert.assertEquals;
-
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class GlobalModVersionTest {
@@ -163,20 +160,17 @@ public class GlobalModVersionTest {
// build a simple execution graph with on job vertex, parallelism 2
final ExecutionGraph graph = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ new DummyJobInformation(
jid,
- "test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- Time.seconds(10),
- new InfiniteDelayRestartStrategy(),
- new CustomStrategy(failoverStrategy),
- Collections.emptyList(),
- Collections.emptyList(),
- slotProvider,
- getClass().getClassLoader(),
- null);
+ "test job"),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ Time.seconds(10),
+ new InfiniteDelayRestartStrategy(),
+ new CustomStrategy(failoverStrategy),
+ slotProvider,
+ getClass().getClassLoader(),
+ null);
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 9d7cd90..9d924c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -18,10 +18,8 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -37,17 +35,15 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.SerializedValue;
import org.junit.Test;
-import java.util.Collections;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* These tests make sure that global failover (restart all) always takes precedence over
@@ -287,20 +283,17 @@ public class IndividualRestartsConcurrencyTest {
// build a simple execution graph with on job vertex, parallelism 2
final ExecutionGraph graph = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ new DummyJobInformation(
jid,
- "test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- Time.seconds(10),
- restartStrategy,
- failoverStrategy,
- Collections.emptyList(),
- Collections.emptyList(),
- slotProvider,
- getClass().getClassLoader(),
- null);
+ "test job"),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ Time.seconds(10),
+ restartStrategy,
+ failoverStrategy,
+ slotProvider,
+ getClass().getClassLoader(),
+ null);
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index a0351fa..93c163b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -18,10 +18,8 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -37,11 +35,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.SerializedValue;
import org.junit.Test;
-import java.util.Collections;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
@@ -309,20 +305,17 @@ public class PipelinedRegionFailoverConcurrencyTest {
// build a simple execution graph with on job vertex, parallelism 2
final ExecutionGraph graph = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ new DummyJobInformation(
jid,
- "test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- Time.seconds(10),
- restartStrategy,
- failoverStrategy,
- Collections.emptyList(),
- Collections.emptyList(),
- slotProvider,
- getClass().getClassLoader(),
- null);
+ "test job"),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ Time.seconds(10),
+ restartStrategy,
+ failoverStrategy,
+ slotProvider,
+ getClass().getClassLoader(),
+ null);
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index ecbdc46..edb39e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -18,11 +18,9 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -32,12 +30,11 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
+
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -64,8 +61,7 @@ public class RestartPipelinedRegionStrategyTest {
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
- final Configuration cfg = new Configuration();
-
+
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
JobVertex v3 = new JobVertex("vertex3");
@@ -94,17 +90,14 @@ public class RestartPipelinedRegionStrategyTest {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
ExecutionGraph eg = new ExecutionGraph(
+ new DummyJobInformation(
+ jobId,
+ jobName),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartPipelinedRegionStrategy.Factory(),
- Collections.emptyList(),
- Collections.emptyList(),
scheduler,
ExecutionGraph.class.getClassLoader(),
null);
@@ -151,7 +144,6 @@ public class RestartPipelinedRegionStrategyTest {
public void testMultipleFailoverRegions() throws Exception {
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
- final Configuration cfg = new Configuration();
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
@@ -180,17 +172,14 @@ public class RestartPipelinedRegionStrategyTest {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
ExecutionGraph eg = new ExecutionGraph(
+ new DummyJobInformation(
+ jobId,
+ jobName),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartPipelinedRegionStrategy.Factory(),
- Collections.emptyList(),
- Collections.emptyList(),
scheduler,
ExecutionGraph.class.getClassLoader(),
null);
@@ -241,7 +230,6 @@ public class RestartPipelinedRegionStrategyTest {
public void testSingleRegionWithMixedInput() throws Exception {
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
- final Configuration cfg = new Configuration();
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
@@ -271,17 +259,14 @@ public class RestartPipelinedRegionStrategyTest {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
ExecutionGraph eg = new ExecutionGraph(
+ new DummyJobInformation(
+ jobId,
+ jobName),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartPipelinedRegionStrategy.Factory(),
- Collections.emptyList(),
- Collections.emptyList(),
scheduler,
ExecutionGraph.class.getClassLoader(),
null);
@@ -327,7 +312,6 @@ public class RestartPipelinedRegionStrategyTest {
public void testMultiRegionNotAllToAll() throws Exception {
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
- final Configuration cfg = new Configuration();
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
@@ -353,17 +337,14 @@ public class RestartPipelinedRegionStrategyTest {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
ExecutionGraph eg = new ExecutionGraph(
+ new DummyJobInformation(
+ jobId,
+ jobName),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartPipelinedRegionStrategy.Factory(),
- Collections.emptyList(),
- Collections.emptyList(),
scheduler,
ExecutionGraph.class.getClassLoader(),
null);
http://git-wip-us.apache.org/repos/asf/flink/blob/315badcf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 70b030c..e72ddf7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.runtime.partitioner;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
@@ -25,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -41,13 +41,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -143,17 +141,14 @@ public class RescalePartitionerTest extends TestLogger {
assertEquals(2, sinkVertex.getParallelism());
ExecutionGraph eg = new ExecutionGraph(
+ new DummyJobInformation(
+ jobId,
+ jobName),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
- new ArrayList<>(),
- new ArrayList<>(),
new Scheduler(TestingUtils.defaultExecutionContext()),
ExecutionGraph.class.getClassLoader(),
null);