You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/01 11:33:06 UTC
[flink] 01/02: [FLINK-13001][tests] Add TestingExecutionGraphBuilder
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a853cdab96435a1cf978ec13fb4bf484e2c0e553
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 26 13:41:51 2019 +0200
[FLINK-13001][tests] Add TestingExecutionGraphBuilder
---
.../executiongraph/ExecutionGraphTestUtils.java | 177 +++++++++++++++++----
1 file changed, 148 insertions(+), 29 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index f614795..8623eda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,15 +18,19 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -34,6 +38,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -41,12 +46,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
-import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +63,7 @@ import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
@@ -397,23 +404,14 @@ public class ExecutionGraphTestUtils {
checkNotNull(vertices);
checkNotNull(timeout);
- return ExecutionGraphBuilder.buildGraph(
- null,
- new JobGraph(jid, "test job", vertices),
- new Configuration(),
- executor,
- executor,
- slotProvider,
- ExecutionGraphTestUtils.class.getClassLoader(),
- new StandaloneCheckpointRecoveryFactory(),
- timeout,
- restartStrategy,
- new UnregisteredMetricsGroup(),
- VoidBlobWriter.getInstance(),
- timeout,
- TEST_LOGGER,
- NettyShuffleMaster.INSTANCE,
- NoOpPartitionTracker.INSTANCE);
+ return new TestingExecutionGraphBuilder(vertices)
+ .setFutureExecutor(executor)
+ .setIoExecutor(executor)
+ .setSlotProvider(slotProvider)
+ .setAllocationTimeout(timeout)
+ .setRpcTimeout(timeout)
+ .setRestartStrategy(restartStrategy)
+ .build();
}
public static JobVertex createNoOpVertex(int parallelism) {
@@ -445,16 +443,10 @@ public class ExecutionGraphTestUtils {
JobVertex ajv = new JobVertex("TestVertex", id);
ajv.setInvokableClass(AbstractInvokable.class);
- ExecutionGraph graph = new ExecutionGraph(
- executor,
- executor,
- new JobID(),
- "test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new TestingSlotProvider(ignored -> new CompletableFuture<>()));
+ ExecutionGraph graph = new TestingExecutionGraphBuilder(ajv)
+ .setIoExecutor(executor)
+ .setFutureExecutor(executor)
+ .build();
graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
@@ -537,4 +529,131 @@ public class ExecutionGraphTestUtils {
subtaskIndex++;
}
}
+
+ /**
+ * Builder for {@link ExecutionGraph}.
+ */
+ public static class TestingExecutionGraphBuilder {
+
+ private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+ private Time allocationTimeout = Time.seconds(10L);
+ private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+ private MetricGroup metricGroup = new UnregisteredMetricsGroup();
+ private RestartStrategy restartStrategy = new NoRestartStrategy();
+ private Time rpcTimeout = AkkaUtils.getDefaultTimeout();
+ private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+ private ClassLoader classLoader = getClass().getClassLoader();
+ private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlot()));
+ private Executor ioExecutor = TestingUtils.defaultExecutor();
+ private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+ private Configuration jobMasterConfig = new Configuration();
+ private JobGraph jobGraph;
+ private PartitionTracker partitionTracker = NoOpPartitionTracker.INSTANCE;
+
+ public TestingExecutionGraphBuilder(final JobVertex ... jobVertices) {
+ this(new JobID(), "test job", jobVertices);
+ }
+
+ public TestingExecutionGraphBuilder(final JobID jobId, final JobVertex ... jobVertices) {
+ this(jobId, "test job", jobVertices);
+ }
+
+ public TestingExecutionGraphBuilder(final String jobName, final JobVertex ... jobVertices) {
+ this(new JobID(), jobName, jobVertices);
+ }
+
+ public TestingExecutionGraphBuilder(final JobID jobId, final String jobName, final JobVertex ... jobVertices) {
+ this(new JobGraph(jobId, jobName, jobVertices));
+ }
+
+ public TestingExecutionGraphBuilder(final JobGraph jobGraph) {
+ this.jobGraph = jobGraph;
+ }
+
+ public TestingExecutionGraphBuilder setJobMasterConfig(final Configuration jobMasterConfig) {
+ this.jobMasterConfig = jobMasterConfig;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) {
+ this.futureExecutor = futureExecutor;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setIoExecutor(final Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setSlotProvider(final SlotProvider slotProvider) {
+ this.slotProvider = slotProvider;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setClassLoader(final ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setRpcTimeout(final Time rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setRestartStrategy(final RestartStrategy restartStrategy) {
+ this.restartStrategy = restartStrategy;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setMetricGroup(final MetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setBlobWriter(final BlobWriter blobWriter) {
+ this.blobWriter = blobWriter;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setAllocationTimeout(final Time allocationTimeout) {
+ this.allocationTimeout = allocationTimeout;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+ this.shuffleMaster = shuffleMaster;
+ return this;
+ }
+
+ public TestingExecutionGraphBuilder setPartitionTracker(final PartitionTracker partitionTracker) {
+ this.partitionTracker = partitionTracker;
+ return this;
+ }
+
+ public ExecutionGraph build() throws JobException, JobExecutionException {
+ return ExecutionGraphBuilder.buildGraph(
+ null,
+ jobGraph,
+ jobMasterConfig,
+ futureExecutor,
+ ioExecutor,
+ slotProvider,
+ classLoader,
+ checkpointRecoveryFactory,
+ rpcTimeout,
+ restartStrategy,
+ metricGroup,
+ blobWriter,
+ allocationTimeout,
+ TEST_LOGGER,
+ shuffleMaster,
+ partitionTracker
+ );
+ }
+ }
}