You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/01 11:33:06 UTC

[flink] 01/02: [FLINK-13001][tests] Add TestingExecutionGraphBuilder

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a853cdab96435a1cf978ec13fb4bf484e2c0e553
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 26 13:41:51 2019 +0200

    [FLINK-13001][tests] Add TestingExecutionGraphBuilder
---
 .../executiongraph/ExecutionGraphTestUtils.java    | 177 +++++++++++++++++----
 1 file changed, 148 insertions(+), 29 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index f614795..8623eda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,15 +18,19 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -34,6 +38,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -41,12 +46,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
-import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +63,7 @@ import java.lang.reflect.Field;
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
@@ -397,23 +404,14 @@ public class ExecutionGraphTestUtils {
 		checkNotNull(vertices);
 		checkNotNull(timeout);
 
-		return ExecutionGraphBuilder.buildGraph(
-			null,
-			new JobGraph(jid, "test job", vertices),
-			new Configuration(),
-			executor,
-			executor,
-			slotProvider,
-			ExecutionGraphTestUtils.class.getClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			timeout,
-			restartStrategy,
-			new UnregisteredMetricsGroup(),
-			VoidBlobWriter.getInstance(),
-			timeout,
-			TEST_LOGGER,
-			NettyShuffleMaster.INSTANCE,
-			NoOpPartitionTracker.INSTANCE);
+		return new TestingExecutionGraphBuilder(vertices)
+			.setFutureExecutor(executor)
+			.setIoExecutor(executor)
+			.setSlotProvider(slotProvider)
+			.setAllocationTimeout(timeout)
+			.setRpcTimeout(timeout)
+			.setRestartStrategy(restartStrategy)
+			.build();
 	}
 
 	public static JobVertex createNoOpVertex(int parallelism) {
@@ -445,16 +443,10 @@ public class ExecutionGraphTestUtils {
 		JobVertex ajv = new JobVertex("TestVertex", id);
 		ajv.setInvokableClass(AbstractInvokable.class);
 
-		ExecutionGraph graph = new ExecutionGraph(
-			executor,
-			executor,
-			new JobID(), 
-			"test job", 
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
+		ExecutionGraph graph = new TestingExecutionGraphBuilder(ajv)
+			.setIoExecutor(executor)
+			.setFutureExecutor(executor)
+			.build();
 
 		graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
 
@@ -537,4 +529,131 @@ public class ExecutionGraphTestUtils {
 			subtaskIndex++;
 		}
 	}
+
+	/**
+	 * Builder for {@link ExecutionGraph}.
+	 */
+	public static class TestingExecutionGraphBuilder {
+
+		private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+		private Time allocationTimeout = Time.seconds(10L);
+		private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+		private MetricGroup metricGroup = new UnregisteredMetricsGroup();
+		private RestartStrategy restartStrategy = new NoRestartStrategy();
+		private Time rpcTimeout = AkkaUtils.getDefaultTimeout();
+		private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+		private ClassLoader classLoader = getClass().getClassLoader();
+		private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlot()));
+		private Executor ioExecutor = TestingUtils.defaultExecutor();
+		private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+		private Configuration jobMasterConfig = new Configuration();
+		private JobGraph jobGraph;
+		private PartitionTracker partitionTracker = NoOpPartitionTracker.INSTANCE;
+
+		public TestingExecutionGraphBuilder(final JobVertex ... jobVertices) {
+			this(new JobID(), "test job", jobVertices);
+		}
+
+		public TestingExecutionGraphBuilder(final JobID jobId, final JobVertex ... jobVertices) {
+			this(jobId, "test job", jobVertices);
+		}
+
+		public TestingExecutionGraphBuilder(final String jobName, final JobVertex ... jobVertices) {
+			this(new JobID(), jobName, jobVertices);
+		}
+
+		public TestingExecutionGraphBuilder(final JobID jobId, final String jobName, final JobVertex ... jobVertices) {
+			this(new JobGraph(jobId, jobName, jobVertices));
+		}
+
+		public TestingExecutionGraphBuilder(final JobGraph jobGraph) {
+			this.jobGraph = jobGraph;
+		}
+
+		public TestingExecutionGraphBuilder setJobMasterConfig(final Configuration jobMasterConfig) {
+			this.jobMasterConfig = jobMasterConfig;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) {
+			this.futureExecutor = futureExecutor;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setIoExecutor(final Executor ioExecutor) {
+			this.ioExecutor = ioExecutor;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setSlotProvider(final SlotProvider slotProvider) {
+			this.slotProvider = slotProvider;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setClassLoader(final ClassLoader classLoader) {
+			this.classLoader = classLoader;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+			this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setRpcTimeout(final Time rpcTimeout) {
+			this.rpcTimeout = rpcTimeout;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setRestartStrategy(final RestartStrategy restartStrategy) {
+			this.restartStrategy = restartStrategy;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setMetricGroup(final MetricGroup metricGroup) {
+			this.metricGroup = metricGroup;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setBlobWriter(final BlobWriter blobWriter) {
+			this.blobWriter = blobWriter;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setAllocationTimeout(final Time allocationTimeout) {
+			this.allocationTimeout = allocationTimeout;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+			this.shuffleMaster = shuffleMaster;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setPartitionTracker(final PartitionTracker partitionTracker) {
+			this.partitionTracker = partitionTracker;
+			return this;
+		}
+
+		public ExecutionGraph build() throws JobException, JobExecutionException {
+			return ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				jobMasterConfig,
+				futureExecutor,
+				ioExecutor,
+				slotProvider,
+				classLoader,
+				checkpointRecoveryFactory,
+				rpcTimeout,
+				restartStrategy,
+				metricGroup,
+				blobWriter,
+				allocationTimeout,
+				TEST_LOGGER,
+				shuffleMaster,
+				partitionTracker
+			);
+		}
+	}
 }