You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/18 14:20:26 UTC

[flink] 02/02: [hotfix] Introduce TaskDeploymentDescriptorBuilder in tests

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21e2e6515e0c93d7cdbee80e00f39178eb2bcb71
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri Aug 16 10:07:41 2019 +0200

    [hotfix] Introduce TaskDeploymentDescriptorBuilder in tests
---
 .../TaskDeploymentDescriptorBuilder.java           | 157 +++++++++++++++++++++
 .../runtime/taskexecutor/TaskExecutorTest.java     | 133 ++---------------
 2 files changed, 170 insertions(+), 120 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorBuilder.java
new file mode 100644
index 0000000..4b9a407
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorBuilder.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Builder for {@link TaskDeploymentDescriptor}.
+ */
+public class TaskDeploymentDescriptorBuilder {
+	private JobID jobId;
+	private MaybeOffloaded<JobInformation> serializedJobInformation;
+	private MaybeOffloaded<TaskInformation> serializedTaskInformation;
+	private ExecutionAttemptID executionId;
+	private AllocationID allocationId;
+	private int subtaskIndex;
+	private int attemptNumber;
+	private Collection<ResultPartitionDeploymentDescriptor> producedPartitions;
+	private Collection<InputGateDeploymentDescriptor> inputGates;
+	private int targetSlotNumber;
+
+	@Nullable
+	private JobManagerTaskRestore taskRestore;
+
+	private TaskDeploymentDescriptorBuilder(JobID jobId, String invokableClassName) throws IOException {
+		TaskInformation taskInformation = new TaskInformation(
+			new JobVertexID(),
+			"test task",
+			1,
+			1,
+			invokableClassName,
+			new Configuration());
+
+		this.jobId = jobId;
+		this.serializedJobInformation =
+			new NonOffloaded<>(new SerializedValue<>(new DummyJobInformation(jobId, "DummyJob")));
+		this.serializedTaskInformation = new NonOffloaded<>(new SerializedValue<>(taskInformation));
+		this.executionId = new ExecutionAttemptID();
+		this.allocationId = new AllocationID();
+		this.subtaskIndex = 0;
+		this.attemptNumber = 0;
+		this.producedPartitions = Collections.emptyList();
+		this.inputGates = Collections.emptyList();
+		this.targetSlotNumber = 0;
+		this.taskRestore = null;
+	}
+
+	public TaskDeploymentDescriptorBuilder setSerializedJobInformation(
+			MaybeOffloaded<JobInformation> serializedJobInformation) {
+		this.serializedJobInformation = serializedJobInformation;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setSerializedTaskInformation(
+			MaybeOffloaded<TaskInformation> serializedTaskInformation) {
+		this.serializedTaskInformation = serializedTaskInformation;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setJobId(JobID jobId) {
+		this.jobId = jobId;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setExecutionId(ExecutionAttemptID executionId) {
+		this.executionId = executionId;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setAllocationId(AllocationID allocationId) {
+		this.allocationId = allocationId;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setSubtaskIndex(int subtaskIndex) {
+		this.subtaskIndex = subtaskIndex;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setAttemptNumber(int attemptNumber) {
+		this.attemptNumber = attemptNumber;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setProducedPartitions(
+			Collection<ResultPartitionDeploymentDescriptor> producedPartitions) {
+		this.producedPartitions = producedPartitions;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
+		this.inputGates = inputGates;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setTargetSlotNumber(int targetSlotNumber) {
+		this.targetSlotNumber = targetSlotNumber;
+		return this;
+	}
+
+	public TaskDeploymentDescriptorBuilder setTaskRestore(@Nullable JobManagerTaskRestore taskRestore) {
+		this.taskRestore = taskRestore;
+		return this;
+	}
+
+	public TaskDeploymentDescriptor build() {
+		return new TaskDeploymentDescriptor(
+			jobId,
+			serializedJobInformation,
+			serializedTaskInformation,
+			executionId,
+			allocationId,
+			subtaskIndex,
+			attemptNumber,
+			targetSlotNumber,
+			taskRestore,
+			producedPartitions,
+			inputGates);
+	}
+
+	public static TaskDeploymentDescriptorBuilder newBuilder(JobID jobId, Class<?> invokableClass) throws IOException {
+		return new TaskDeploymentDescriptorBuilder(jobId, invokableClass.getName());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index bc6b296..be9fb53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -40,15 +39,11 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.DummyJobInformation;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
@@ -93,7 +88,6 @@ import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment.Build
 import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
 import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
-import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -113,7 +107,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionUtils;
 
@@ -146,7 +139,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -658,37 +650,10 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterId jobMasterId = JobMasterId.generate();
 		final JobVertexID jobVertexId = new JobVertexID();
 
-		JobInformation jobInformation = new JobInformation(
-				jobId,
-				testName.getMethodName(),
-				new SerializedValue<>(new ExecutionConfig()),
-				new Configuration(),
-				Collections.emptyList(),
-				Collections.emptyList());
-
-		TaskInformation taskInformation = new TaskInformation(
-				jobVertexId,
-				"test task",
-				1,
-				1,
-				TestInvokable.class.getName(),
-				new Configuration());
-
-		SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
-		SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
-
-		final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				jobId,
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
-				new ExecutionAttemptID(),
-				allocationId,
-				0,
-				0,
-				0,
-				null,
-				Collections.emptyList(),
-				Collections.emptyList());
+		final TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorBuilder
+			.newBuilder(jobId, TestInvokable.class)
+			.setAllocationId(allocationId)
+			.build();
 
 		final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
 		when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
@@ -772,8 +737,10 @@ public class TaskExecutorTest extends TestLogger {
 	public void testTaskInterruptionAndTerminationOnShutdown() throws Exception {
 		final JobMasterId jobMasterId = JobMasterId.generate();
 		final AllocationID allocationId = new AllocationID();
-		final TaskDeploymentDescriptor taskDeploymentDescriptor =
-			createTaskDeploymentDescriptor(TestInterruptableInvokable.class, allocationId);
+		final TaskDeploymentDescriptor taskDeploymentDescriptor = TaskDeploymentDescriptorBuilder
+			.newBuilder(jobId, TestInterruptableInvokable.class)
+			.setAllocationId(allocationId)
+			.build();
 
 		final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId);
 		final TaskExecutor taskExecutor = createTaskExecutorWithJobManagerTable(jobManagerTable);
@@ -878,31 +845,6 @@ public class TaskExecutorTest extends TestLogger {
 		return jobManagerTable;
 	}
 
-	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
-		final Class<? extends AbstractInvokable> invokableClass,
-		final AllocationID allocationId) throws IOException {
-		final TaskInformation taskInformation = new TaskInformation(
-			new JobVertexID(),
-			"test task",
-			1,
-			1,
-			invokableClass.getName(),
-			new Configuration());
-
-		return new TaskDeploymentDescriptor(
-			jobId,
-			new NonOffloaded<>(new SerializedValue<>(new DummyJobInformation(jobId, testName.getMethodName()))),
-			new NonOffloaded<>(new SerializedValue<>(taskInformation)),
-			new ExecutionAttemptID(),
-			allocationId,
-			0,
-			0,
-			0,
-			null,
-			Collections.emptyList(),
-			Collections.emptyList());
-	}
-
 	/**
 	 * Tests that a TaskManager detects a job leader for which it has reserved slots. Upon detecting
 	 * the job leader, it will offer all reserved slots to the JobManager.
@@ -1137,39 +1079,10 @@ public class TaskExecutorTest extends TestLogger {
 			taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
 			taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L));
 
-			final JobVertexID jobVertexId = new JobVertexID();
-
-			JobInformation jobInformation = new JobInformation(
-				jobId,
-				testName.getMethodName(),
-				new SerializedValue<>(new ExecutionConfig()),
-				new Configuration(),
-				Collections.emptyList(),
-				Collections.emptyList());
-
-			TaskInformation taskInformation = new TaskInformation(
-				jobVertexId,
-				"test task",
-				1,
-				1,
-				NoOpInvokable.class.getName(),
-				new Configuration());
-
-			SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
-			SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
-
-			final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				jobId,
-				new NonOffloaded<>(serializedJobInformation),
-				new NonOffloaded<>(serializedJobVertexInformation),
-				new ExecutionAttemptID(),
-				allocationId1,
-				0,
-				0,
-				0,
-				null,
-				Collections.emptyList(),
-				Collections.emptyList());
+			final TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorBuilder
+				.newBuilder(jobId, NoOpInvokable.class)
+				.setAllocationId(allocationId1)
+				.build();
 
 			// we have to add the job after the TaskExecutor, because otherwise the service has not
 			// been properly started. This will also offer the slots to the job master
@@ -2213,24 +2126,4 @@ public class TaskExecutorTest extends TestLogger {
 			}
 		}
 	}
-
-	/**
-	 * {@link TaskSlotTable} which completes the given future when it is started.
-	 */
-	private static class TaskSlotTableWithStartFuture extends TaskSlotTable {
-		private final CompletableFuture<Void> taskSlotTableStarted;
-
-		private TaskSlotTableWithStartFuture(
-				CompletableFuture<Void> taskSlotTableStarted,
-				TimerService<AllocationID> timerService) {
-			super(Collections.singletonList(ResourceProfile.UNKNOWN), timerService);
-			this.taskSlotTableStarted = taskSlotTableStarted;
-		}
-
-		@Override
-		public void start(SlotActions initialSlotActions) {
-			super.start(initialSlotActions);
-			taskSlotTableStarted.complete(null);
-		}
-	}
 }