You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 14:48:43 UTC

[flink] 02/02: [FLINK-11389][tests] Refactor TaskDeploymentDescriptorTest

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acf228cc5c88872428ccb10296aa1646837c16c2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 31 11:39:55 2019 +0100

    [FLINK-11389][tests] Refactor TaskDeploymentDescriptorTest
    
    This closes #7532.
---
 .../deployment/TaskDeploymentDescriptorTest.java   | 178 +++++++++++----------
 1 file changed, 93 insertions(+), 85 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 22e943b..a617ce1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -33,107 +33,115 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link TaskDeploymentDescriptor}.
  */
-public class TaskDeploymentDescriptorTest {
+public class TaskDeploymentDescriptorTest extends TestLogger {
+
+	private static final JobID jobID = new JobID();
+	private static final JobVertexID vertexID = new JobVertexID();
+	private static final ExecutionAttemptID execId = new ExecutionAttemptID();
+	private static final AllocationID allocationId = new AllocationID();
+	private static final String jobName = "job name";
+	private static final String taskName = "task name";
+	private static final int numberOfKeyGroups = 1;
+	private static final int indexInSubtaskGroup = 0;
+	private static final int currentNumberOfSubtasks = 1;
+	private static final int attemptNumber = 0;
+	private static final Configuration jobConfiguration = new Configuration();
+	private static final Configuration taskConfiguration = new Configuration();
+	private static final Class<? extends AbstractInvokable> invokableClass = BatchTask.class;
+	private static final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
+	private static final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
+	private static final List<PermanentBlobKey> requiredJars = new ArrayList<>(0);
+	private static final List<URL> requiredClasspaths = new ArrayList<>(0);
+	private static final int targetSlotNumber = 47;
+	private static final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
+	private static final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles);
+
+	private final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
+	private final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation(
+		jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths));
+	private final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
+		vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration));
+
+	public TaskDeploymentDescriptorTest() throws IOException {}
+
 	@Test
-	public void testSerialization() {
-		try {
-			final JobID jobID = new JobID();
-			final JobVertexID vertexID = new JobVertexID();
-			final ExecutionAttemptID execId = new ExecutionAttemptID();
-			final AllocationID allocationId = new AllocationID();
-			final String jobName = "job name";
-			final String taskName = "task name";
-			final int numberOfKeyGroups = 1;
-			final int indexInSubtaskGroup = 0;
-			final int currentNumberOfSubtasks = 1;
-			final int attemptNumber = 0;
-			final Configuration jobConfiguration = new Configuration();
-			final Configuration taskConfiguration = new Configuration();
-			final Class<? extends AbstractInvokable> invokableClass = BatchTask.class;
-			final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
-			final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
-			final List<PermanentBlobKey> requiredJars = new ArrayList<>(0);
-			final List<URL> requiredClasspaths = new ArrayList<>(0);
-			final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
-			final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation(
-				jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths));
-			final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
-				vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration));
-			final int targetSlotNumber = 47;
-			final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
-			final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles);
-
-			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(
-				jobID,
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
-				execId,
-				allocationId,
-				indexInSubtaskGroup,
-				attemptNumber,
-				targetSlotNumber,
-				taskRestore,
-				producedResults,
-				inputGates);
-
-			final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
-
-			assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation());
-			assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation());
-			assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId());
-			assertFalse(orig.getTaskRestore() == copy.getTaskRestore());
-			assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions());
-			assertFalse(orig.getInputGates() == copy.getInputGates());
-
-			assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation());
-			assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation());
-			assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId());
-			assertEquals(orig.getAllocationId(), copy.getAllocationId());
-			assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
-			assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
-			assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
-			assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId());
-			assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
-			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
-			assertEquals(orig.getInputGates(), copy.getInputGates());
-
-			final TaskDeploymentDescriptor testOffLoadedTaskInformation = new TaskDeploymentDescriptor(
-				jobID,
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-				new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()),
-				execId,
-				allocationId,
-				indexInSubtaskGroup,
-				attemptNumber,
-				targetSlotNumber,
-				taskRestore,
-				producedResults,
-				inputGates);
-			try {
-				testOffLoadedTaskInformation.getSerializedTaskInformation();
-			} catch (Exception e) {
-				assertTrue(e instanceof IllegalStateException);
-			}
+	public void testSerialization() throws Exception {
+		final TaskDeploymentDescriptor orig = createTaskDeploymentDescriptor(
+			new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+			new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation));
 
+		final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
+
+		assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation());
+		assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation());
+		assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId());
+		assertFalse(orig.getTaskRestore() == copy.getTaskRestore());
+		assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions());
+		assertFalse(orig.getInputGates() == copy.getInputGates());
+
+		assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation());
+		assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation());
+		assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId());
+		assertEquals(orig.getAllocationId(), copy.getAllocationId());
+		assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
+		assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
+		assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
+		assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId());
+		assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
+		assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
+		assertEquals(orig.getInputGates(), copy.getInputGates());
+	}
+
+	@Test
+	public void testOffLoadedAndNonOffLoadedPayload() {
+		final TaskDeploymentDescriptor taskDeploymentDescriptor = createTaskDeploymentDescriptor(
+			new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+			new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()));
+
+		SerializedValue<JobInformation> actualSerializedJobInformation = taskDeploymentDescriptor.getSerializedJobInformation();
+		assertThat(actualSerializedJobInformation, is(serializedJobInformation));
+
+		try {
+			taskDeploymentDescriptor.getSerializedTaskInformation();
+			fail("Expected to fail since the task information should be offloaded.");
+		} catch (IllegalStateException expected) {
+			// expected
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	}
+
+	@Nonnull
+	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> jobInformation, TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInformation) {
+		return new TaskDeploymentDescriptor(
+			jobID,
+			jobInformation,
+			taskInformation,
+			execId,
+			allocationId,
+			indexInSubtaskGroup,
+			attemptNumber,
+			targetSlotNumber,
+			taskRestore,
+			producedResults,
+			inputGates);
 	}
 }