You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 14:48:43 UTC
[flink] 02/02: [FLINK-11389][tests] Refactor
TaskDeploymentDescriptorTest
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit acf228cc5c88872428ccb10296aa1646837c16c2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 31 11:39:55 2019 +0100
[FLINK-11389][tests] Refactor TaskDeploymentDescriptorTest
This closes #7532.
---
.../deployment/TaskDeploymentDescriptorTest.java | 178 +++++++++++----------
1 file changed, 93 insertions(+), 85 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 22e943b..a617ce1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -33,107 +33,115 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* Tests for the {@link TaskDeploymentDescriptor}.
*/
-public class TaskDeploymentDescriptorTest {
+public class TaskDeploymentDescriptorTest extends TestLogger {
+
+ private static final JobID jobID = new JobID();
+ private static final JobVertexID vertexID = new JobVertexID();
+ private static final ExecutionAttemptID execId = new ExecutionAttemptID();
+ private static final AllocationID allocationId = new AllocationID();
+ private static final String jobName = "job name";
+ private static final String taskName = "task name";
+ private static final int numberOfKeyGroups = 1;
+ private static final int indexInSubtaskGroup = 0;
+ private static final int currentNumberOfSubtasks = 1;
+ private static final int attemptNumber = 0;
+ private static final Configuration jobConfiguration = new Configuration();
+ private static final Configuration taskConfiguration = new Configuration();
+ private static final Class<? extends AbstractInvokable> invokableClass = BatchTask.class;
+ private static final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
+ private static final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
+ private static final List<PermanentBlobKey> requiredJars = new ArrayList<>(0);
+ private static final List<URL> requiredClasspaths = new ArrayList<>(0);
+ private static final int targetSlotNumber = 47;
+ private static final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
+ private static final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles);
+
+ private final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
+ private final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation(
+ jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths));
+ private final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
+ vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration));
+
+ public TaskDeploymentDescriptorTest() throws IOException {}
+
@Test
- public void testSerialization() {
- try {
- final JobID jobID = new JobID();
- final JobVertexID vertexID = new JobVertexID();
- final ExecutionAttemptID execId = new ExecutionAttemptID();
- final AllocationID allocationId = new AllocationID();
- final String jobName = "job name";
- final String taskName = "task name";
- final int numberOfKeyGroups = 1;
- final int indexInSubtaskGroup = 0;
- final int currentNumberOfSubtasks = 1;
- final int attemptNumber = 0;
- final Configuration jobConfiguration = new Configuration();
- final Configuration taskConfiguration = new Configuration();
- final Class<? extends AbstractInvokable> invokableClass = BatchTask.class;
- final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
- final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
- final List<PermanentBlobKey> requiredJars = new ArrayList<>(0);
- final List<URL> requiredClasspaths = new ArrayList<>(0);
- final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
- final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation(
- jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths));
- final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
- vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration));
- final int targetSlotNumber = 47;
- final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
- final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles);
-
- final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(
- jobID,
- new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
- new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
- execId,
- allocationId,
- indexInSubtaskGroup,
- attemptNumber,
- targetSlotNumber,
- taskRestore,
- producedResults,
- inputGates);
-
- final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
-
- assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation());
- assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation());
- assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId());
- assertFalse(orig.getTaskRestore() == copy.getTaskRestore());
- assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions());
- assertFalse(orig.getInputGates() == copy.getInputGates());
-
- assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation());
- assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation());
- assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId());
- assertEquals(orig.getAllocationId(), copy.getAllocationId());
- assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
- assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
- assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
- assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId());
- assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
- assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
- assertEquals(orig.getInputGates(), copy.getInputGates());
-
- final TaskDeploymentDescriptor testOffLoadedTaskInformation = new TaskDeploymentDescriptor(
- jobID,
- new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
- new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()),
- execId,
- allocationId,
- indexInSubtaskGroup,
- attemptNumber,
- targetSlotNumber,
- taskRestore,
- producedResults,
- inputGates);
- try {
- testOffLoadedTaskInformation.getSerializedTaskInformation();
- } catch (Exception e) {
- assertTrue(e instanceof IllegalStateException);
- }
+ public void testSerialization() throws Exception {
+ final TaskDeploymentDescriptor orig = createTaskDeploymentDescriptor(
+ new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+ new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation));
+ final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
+
+ assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation());
+ assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation());
+ assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId());
+ assertFalse(orig.getTaskRestore() == copy.getTaskRestore());
+ assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions());
+ assertFalse(orig.getInputGates() == copy.getInputGates());
+
+ assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation());
+ assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation());
+ assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId());
+ assertEquals(orig.getAllocationId(), copy.getAllocationId());
+ assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
+ assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
+ assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
+ assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId());
+ assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
+ assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
+ assertEquals(orig.getInputGates(), copy.getInputGates());
+ }
+
+ @Test
+ public void testOffLoadedAndNonOffLoadedPayload() {
+ final TaskDeploymentDescriptor taskDeploymentDescriptor = createTaskDeploymentDescriptor(
+ new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+ new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()));
+
+ SerializedValue<JobInformation> actualSerializedJobInformation = taskDeploymentDescriptor.getSerializedJobInformation();
+ assertThat(actualSerializedJobInformation, is(serializedJobInformation));
+
+ try {
+ taskDeploymentDescriptor.getSerializedTaskInformation();
+ fail("Expected to fail since the task information should be offloaded.");
+ } catch (IllegalStateException expected) {
+ // expected
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ }
+
+ @Nonnull
+ private TaskDeploymentDescriptor createTaskDeploymentDescriptor(TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> jobInformation, TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInformation) {
+ return new TaskDeploymentDescriptor(
+ jobID,
+ jobInformation,
+ taskInformation,
+ execId,
+ allocationId,
+ indexInSubtaskGroup,
+ attemptNumber,
+ targetSlotNumber,
+ taskRestore,
+ producedResults,
+ inputGates);
}
}