You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/18 14:20:26 UTC
[flink] 02/02: [hotfix] Introduce TaskDeploymentDescriptorBuilder
in tests
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 21e2e6515e0c93d7cdbee80e00f39178eb2bcb71
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri Aug 16 10:07:41 2019 +0200
[hotfix] Introduce TaskDeploymentDescriptorBuilder in tests
---
.../TaskDeploymentDescriptorBuilder.java | 157 +++++++++++++++++++++
.../runtime/taskexecutor/TaskExecutorTest.java | 133 ++---------------
2 files changed, 170 insertions(+), 120 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorBuilder.java
new file mode 100644
index 0000000..4b9a407
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorBuilder.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Builder for {@link TaskDeploymentDescriptor}.
+ */
+public class TaskDeploymentDescriptorBuilder {
+ private JobID jobId;
+ private MaybeOffloaded<JobInformation> serializedJobInformation;
+ private MaybeOffloaded<TaskInformation> serializedTaskInformation;
+ private ExecutionAttemptID executionId;
+ private AllocationID allocationId;
+ private int subtaskIndex;
+ private int attemptNumber;
+ private Collection<ResultPartitionDeploymentDescriptor> producedPartitions;
+ private Collection<InputGateDeploymentDescriptor> inputGates;
+ private int targetSlotNumber;
+
+ @Nullable
+ private JobManagerTaskRestore taskRestore;
+
+ private TaskDeploymentDescriptorBuilder(JobID jobId, String invokableClassName) throws IOException {
+ TaskInformation taskInformation = new TaskInformation(
+ new JobVertexID(),
+ "test task",
+ 1,
+ 1,
+ invokableClassName,
+ new Configuration());
+
+ this.jobId = jobId;
+ this.serializedJobInformation =
+ new NonOffloaded<>(new SerializedValue<>(new DummyJobInformation(jobId, "DummyJob")));
+ this.serializedTaskInformation = new NonOffloaded<>(new SerializedValue<>(taskInformation));
+ this.executionId = new ExecutionAttemptID();
+ this.allocationId = new AllocationID();
+ this.subtaskIndex = 0;
+ this.attemptNumber = 0;
+ this.producedPartitions = Collections.emptyList();
+ this.inputGates = Collections.emptyList();
+ this.targetSlotNumber = 0;
+ this.taskRestore = null;
+ }
+
+ public TaskDeploymentDescriptorBuilder setSerializedJobInformation(
+ MaybeOffloaded<JobInformation> serializedJobInformation) {
+ this.serializedJobInformation = serializedJobInformation;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setSerializedTaskInformation(
+ MaybeOffloaded<TaskInformation> serializedTaskInformation) {
+ this.serializedTaskInformation = serializedTaskInformation;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setJobId(JobID jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setExecutionId(ExecutionAttemptID executionId) {
+ this.executionId = executionId;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setAllocationId(AllocationID allocationId) {
+ this.allocationId = allocationId;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setSubtaskIndex(int subtaskIndex) {
+ this.subtaskIndex = subtaskIndex;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setAttemptNumber(int attemptNumber) {
+ this.attemptNumber = attemptNumber;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setProducedPartitions(
+ Collection<ResultPartitionDeploymentDescriptor> producedPartitions) {
+ this.producedPartitions = producedPartitions;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
+ this.inputGates = inputGates;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setTargetSlotNumber(int targetSlotNumber) {
+ this.targetSlotNumber = targetSlotNumber;
+ return this;
+ }
+
+ public TaskDeploymentDescriptorBuilder setTaskRestore(@Nullable JobManagerTaskRestore taskRestore) {
+ this.taskRestore = taskRestore;
+ return this;
+ }
+
+ public TaskDeploymentDescriptor build() {
+ return new TaskDeploymentDescriptor(
+ jobId,
+ serializedJobInformation,
+ serializedTaskInformation,
+ executionId,
+ allocationId,
+ subtaskIndex,
+ attemptNumber,
+ targetSlotNumber,
+ taskRestore,
+ producedPartitions,
+ inputGates);
+ }
+
+ public static TaskDeploymentDescriptorBuilder newBuilder(JobID jobId, Class<?> invokableClass) throws IOException {
+ return new TaskDeploymentDescriptorBuilder(jobId, invokableClass.getName());
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index bc6b296..be9fb53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.taskexecutor;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -40,15 +39,11 @@ import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.DummyJobInformation;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
@@ -93,7 +88,6 @@ import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment.Build
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
-import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -113,7 +107,6 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
@@ -146,7 +139,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -658,37 +650,10 @@ public class TaskExecutorTest extends TestLogger {
final JobMasterId jobMasterId = JobMasterId.generate();
final JobVertexID jobVertexId = new JobVertexID();
- JobInformation jobInformation = new JobInformation(
- jobId,
- testName.getMethodName(),
- new SerializedValue<>(new ExecutionConfig()),
- new Configuration(),
- Collections.emptyList(),
- Collections.emptyList());
-
- TaskInformation taskInformation = new TaskInformation(
- jobVertexId,
- "test task",
- 1,
- 1,
- TestInvokable.class.getName(),
- new Configuration());
-
- SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
- SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
-
- final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- jobId,
- new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
- new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
- new ExecutionAttemptID(),
- allocationId,
- 0,
- 0,
- 0,
- null,
- Collections.emptyList(),
- Collections.emptyList());
+ final TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorBuilder
+ .newBuilder(jobId, TestInvokable.class)
+ .setAllocationId(allocationId)
+ .build();
final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
@@ -772,8 +737,10 @@ public class TaskExecutorTest extends TestLogger {
public void testTaskInterruptionAndTerminationOnShutdown() throws Exception {
final JobMasterId jobMasterId = JobMasterId.generate();
final AllocationID allocationId = new AllocationID();
- final TaskDeploymentDescriptor taskDeploymentDescriptor =
- createTaskDeploymentDescriptor(TestInterruptableInvokable.class, allocationId);
+ final TaskDeploymentDescriptor taskDeploymentDescriptor = TaskDeploymentDescriptorBuilder
+ .newBuilder(jobId, TestInterruptableInvokable.class)
+ .setAllocationId(allocationId)
+ .build();
final JobManagerTable jobManagerTable = createJobManagerTableWithOneJob(jobMasterId);
final TaskExecutor taskExecutor = createTaskExecutorWithJobManagerTable(jobManagerTable);
@@ -878,31 +845,6 @@ public class TaskExecutorTest extends TestLogger {
return jobManagerTable;
}
- private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
- final Class<? extends AbstractInvokable> invokableClass,
- final AllocationID allocationId) throws IOException {
- final TaskInformation taskInformation = new TaskInformation(
- new JobVertexID(),
- "test task",
- 1,
- 1,
- invokableClass.getName(),
- new Configuration());
-
- return new TaskDeploymentDescriptor(
- jobId,
- new NonOffloaded<>(new SerializedValue<>(new DummyJobInformation(jobId, testName.getMethodName()))),
- new NonOffloaded<>(new SerializedValue<>(taskInformation)),
- new ExecutionAttemptID(),
- allocationId,
- 0,
- 0,
- 0,
- null,
- Collections.emptyList(),
- Collections.emptyList());
- }
-
/**
* Tests that a TaskManager detects a job leader for which it has reserved slots. Upon detecting
* the job leader, it will offer all reserved slots to the JobManager.
@@ -1137,39 +1079,10 @@ public class TaskExecutorTest extends TestLogger {
taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L));
- final JobVertexID jobVertexId = new JobVertexID();
-
- JobInformation jobInformation = new JobInformation(
- jobId,
- testName.getMethodName(),
- new SerializedValue<>(new ExecutionConfig()),
- new Configuration(),
- Collections.emptyList(),
- Collections.emptyList());
-
- TaskInformation taskInformation = new TaskInformation(
- jobVertexId,
- "test task",
- 1,
- 1,
- NoOpInvokable.class.getName(),
- new Configuration());
-
- SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
- SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
-
- final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- jobId,
- new NonOffloaded<>(serializedJobInformation),
- new NonOffloaded<>(serializedJobVertexInformation),
- new ExecutionAttemptID(),
- allocationId1,
- 0,
- 0,
- 0,
- null,
- Collections.emptyList(),
- Collections.emptyList());
+ final TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorBuilder
+ .newBuilder(jobId, NoOpInvokable.class)
+ .setAllocationId(allocationId1)
+ .build();
// we have to add the job after the TaskExecutor, because otherwise the service has not
// been properly started. This will also offer the slots to the job master
@@ -2213,24 +2126,4 @@ public class TaskExecutorTest extends TestLogger {
}
}
}
-
- /**
- * {@link TaskSlotTable} which completes the given future when it is started.
- */
- private static class TaskSlotTableWithStartFuture extends TaskSlotTable {
- private final CompletableFuture<Void> taskSlotTableStarted;
-
- private TaskSlotTableWithStartFuture(
- CompletableFuture<Void> taskSlotTableStarted,
- TimerService<AllocationID> timerService) {
- super(Collections.singletonList(ResourceProfile.UNKNOWN), timerService);
- this.taskSlotTableStarted = taskSlotTableStarted;
- }
-
- @Override
- public void start(SlotActions initialSlotActions) {
- super.start(initialSlotActions);
- taskSlotTableStarted.complete(null);
- }
- }
}