You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 14:48:42 UTC

[flink] 01/02: [FLINK-11389] Fix Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 446145f5ca0cd92c64a6944c19ca1ab53104030f
Author: park.yq <pa...@alibaba-inc.com>
AuthorDate: Fri Jan 18 16:40:30 2019 +0800

    [FLINK-11389] Fix Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor
---
 .../runtime/deployment/TaskDeploymentDescriptor.java |  6 +++---
 .../deployment/TaskDeploymentDescriptorTest.java     | 20 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 4f5b231..bb038eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -208,10 +208,10 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 */
 	@Nullable
 	public SerializedValue<TaskInformation> getSerializedTaskInformation() {
-		if (serializedJobInformation instanceof NonOffloaded) {
-			NonOffloaded<TaskInformation> jobInformation =
+		if (serializedTaskInformation instanceof NonOffloaded) {
+			NonOffloaded<TaskInformation> taskInformation =
 				(NonOffloaded<TaskInformation>) serializedTaskInformation;
-			return jobInformation.serializedValue;
+			return taskInformation.serializedValue;
 		} else {
 			throw new IllegalStateException(
 				"Trying to work with offloaded serialized job information.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index e20d34b..22e943b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -42,6 +42,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -110,6 +111,25 @@ public class TaskDeploymentDescriptorTest {
 			assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
 			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
 			assertEquals(orig.getInputGates(), copy.getInputGates());
+
+			final TaskDeploymentDescriptor testOffLoadedTaskInformation = new TaskDeploymentDescriptor(
+				jobID,
+				new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+				new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()),
+				execId,
+				allocationId,
+				indexInSubtaskGroup,
+				attemptNumber,
+				targetSlotNumber,
+				taskRestore,
+				producedResults,
+				inputGates);
+			try {
+				testOffLoadedTaskInformation.getSerializedTaskInformation();
+			} catch (Exception e) {
+				assertTrue(e instanceof IllegalStateException);
+			}
+
 		}
 		catch (Exception e) {
 			e.printStackTrace();