You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 14:50:45 UTC

[flink] branch release-1.6 updated (1b9c464 -> ea90666)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1b9c464  [FLINK-10774][tests] Refactor FlinkKafkaConsumerBaseTest#testConsumerLifeCycle
     new 941ed4d  [FLINK-11389] Fix Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor
     new ea90666  [FLINK-11389][tests] Refactor TaskDeploymentDescriptorTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../deployment/TaskDeploymentDescriptor.java       |   6 +-
 .../deployment/TaskDeploymentDescriptorTest.java   | 158 ++++++++++++---------
 2 files changed, 96 insertions(+), 68 deletions(-)


[flink] 02/02: [FLINK-11389][tests] Refactor TaskDeploymentDescriptorTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea90666b4415d1a7f510053b35101880438eab40
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 31 11:39:55 2019 +0100

    [FLINK-11389][tests] Refactor TaskDeploymentDescriptorTest
    
    This closes #7532.
---
 .../deployment/TaskDeploymentDescriptorTest.java   | 178 +++++++++++----------
 1 file changed, 93 insertions(+), 85 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 22e943b..a617ce1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -33,107 +33,115 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link TaskDeploymentDescriptor}.
  */
-public class TaskDeploymentDescriptorTest {
+public class TaskDeploymentDescriptorTest extends TestLogger {
+
+	private static final JobID jobID = new JobID();
+	private static final JobVertexID vertexID = new JobVertexID();
+	private static final ExecutionAttemptID execId = new ExecutionAttemptID();
+	private static final AllocationID allocationId = new AllocationID();
+	private static final String jobName = "job name";
+	private static final String taskName = "task name";
+	private static final int numberOfKeyGroups = 1;
+	private static final int indexInSubtaskGroup = 0;
+	private static final int currentNumberOfSubtasks = 1;
+	private static final int attemptNumber = 0;
+	private static final Configuration jobConfiguration = new Configuration();
+	private static final Configuration taskConfiguration = new Configuration();
+	private static final Class<? extends AbstractInvokable> invokableClass = BatchTask.class;
+	private static final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
+	private static final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
+	private static final List<PermanentBlobKey> requiredJars = new ArrayList<>(0);
+	private static final List<URL> requiredClasspaths = new ArrayList<>(0);
+	private static final int targetSlotNumber = 47;
+	private static final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
+	private static final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles);
+
+	private final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
+	private final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation(
+		jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths));
+	private final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
+		vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration));
+
+	public TaskDeploymentDescriptorTest() throws IOException {}
+
 	@Test
-	public void testSerialization() {
-		try {
-			final JobID jobID = new JobID();
-			final JobVertexID vertexID = new JobVertexID();
-			final ExecutionAttemptID execId = new ExecutionAttemptID();
-			final AllocationID allocationId = new AllocationID();
-			final String jobName = "job name";
-			final String taskName = "task name";
-			final int numberOfKeyGroups = 1;
-			final int indexInSubtaskGroup = 0;
-			final int currentNumberOfSubtasks = 1;
-			final int attemptNumber = 0;
-			final Configuration jobConfiguration = new Configuration();
-			final Configuration taskConfiguration = new Configuration();
-			final Class<? extends AbstractInvokable> invokableClass = BatchTask.class;
-			final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
-			final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
-			final List<PermanentBlobKey> requiredJars = new ArrayList<>(0);
-			final List<URL> requiredClasspaths = new ArrayList<>(0);
-			final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
-			final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation(
-				jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths));
-			final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
-				vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration));
-			final int targetSlotNumber = 47;
-			final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
-			final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles);
-
-			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(
-				jobID,
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
-				execId,
-				allocationId,
-				indexInSubtaskGroup,
-				attemptNumber,
-				targetSlotNumber,
-				taskRestore,
-				producedResults,
-				inputGates);
-
-			final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
-
-			assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation());
-			assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation());
-			assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId());
-			assertFalse(orig.getTaskRestore() == copy.getTaskRestore());
-			assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions());
-			assertFalse(orig.getInputGates() == copy.getInputGates());
-
-			assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation());
-			assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation());
-			assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId());
-			assertEquals(orig.getAllocationId(), copy.getAllocationId());
-			assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
-			assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
-			assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
-			assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId());
-			assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
-			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
-			assertEquals(orig.getInputGates(), copy.getInputGates());
-
-			final TaskDeploymentDescriptor testOffLoadedTaskInformation = new TaskDeploymentDescriptor(
-				jobID,
-				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-				new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()),
-				execId,
-				allocationId,
-				indexInSubtaskGroup,
-				attemptNumber,
-				targetSlotNumber,
-				taskRestore,
-				producedResults,
-				inputGates);
-			try {
-				testOffLoadedTaskInformation.getSerializedTaskInformation();
-			} catch (Exception e) {
-				assertTrue(e instanceof IllegalStateException);
-			}
+	public void testSerialization() throws Exception {
+		final TaskDeploymentDescriptor orig = createTaskDeploymentDescriptor(
+			new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+			new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation));
 
+		final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
+
+		assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation());
+		assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation());
+		assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId());
+		assertFalse(orig.getTaskRestore() == copy.getTaskRestore());
+		assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions());
+		assertFalse(orig.getInputGates() == copy.getInputGates());
+
+		assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation());
+		assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation());
+		assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId());
+		assertEquals(orig.getAllocationId(), copy.getAllocationId());
+		assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
+		assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
+		assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
+		assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId());
+		assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
+		assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
+		assertEquals(orig.getInputGates(), copy.getInputGates());
+	}
+
+	@Test
+	public void testOffLoadedAndNonOffLoadedPayload() {
+		final TaskDeploymentDescriptor taskDeploymentDescriptor = createTaskDeploymentDescriptor(
+			new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+			new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()));
+
+		SerializedValue<JobInformation> actualSerializedJobInformation = taskDeploymentDescriptor.getSerializedJobInformation();
+		assertThat(actualSerializedJobInformation, is(serializedJobInformation));
+
+		try {
+			taskDeploymentDescriptor.getSerializedTaskInformation();
+			fail("Expected to fail since the task information should be offloaded.");
+		} catch (IllegalStateException expected) {
+			// expected
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	}
+
+	@Nonnull
+	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> jobInformation, TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInformation) {
+		return new TaskDeploymentDescriptor(
+			jobID,
+			jobInformation,
+			taskInformation,
+			execId,
+			allocationId,
+			indexInSubtaskGroup,
+			attemptNumber,
+			targetSlotNumber,
+			taskRestore,
+			producedResults,
+			inputGates);
 	}
 }


[flink] 01/02: [FLINK-11389] Fix Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 941ed4d816489990cd1a420d90b88f167a89db1a
Author: park.yq <pa...@alibaba-inc.com>
AuthorDate: Fri Jan 18 16:40:30 2019 +0800

    [FLINK-11389] Fix Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor
---
 .../runtime/deployment/TaskDeploymentDescriptor.java |  6 +++---
 .../deployment/TaskDeploymentDescriptorTest.java     | 20 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 4f5b231..bb038eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -208,10 +208,10 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 */
 	@Nullable
 	public SerializedValue<TaskInformation> getSerializedTaskInformation() {
-		if (serializedJobInformation instanceof NonOffloaded) {
-			NonOffloaded<TaskInformation> jobInformation =
+		if (serializedTaskInformation instanceof NonOffloaded) {
+			NonOffloaded<TaskInformation> taskInformation =
 				(NonOffloaded<TaskInformation>) serializedTaskInformation;
-			return jobInformation.serializedValue;
+			return taskInformation.serializedValue;
 		} else {
 			throw new IllegalStateException(
 				"Trying to work with offloaded serialized job information.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index e20d34b..22e943b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -42,6 +42,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -110,6 +111,25 @@ public class TaskDeploymentDescriptorTest {
 			assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
 			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
 			assertEquals(orig.getInputGates(), copy.getInputGates());
+
+			final TaskDeploymentDescriptor testOffLoadedTaskInformation = new TaskDeploymentDescriptor(
+				jobID,
+				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+				new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()),
+				execId,
+				allocationId,
+				indexInSubtaskGroup,
+				attemptNumber,
+				targetSlotNumber,
+				taskRestore,
+				producedResults,
+				inputGates);
+			try {
+				testOffLoadedTaskInformation.getSerializedTaskInformation();
+			} catch (Exception e) {
+				assertTrue(e instanceof IllegalStateException);
+			}
+
 		}
 		catch (Exception e) {
 			e.printStackTrace();