You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 14:48:42 UTC
[flink] 01/02: [FLINK-11389] Fix Incorrectly use job information
when call getSerializedTaskInformation in class TaskDeploymentDescriptor
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 446145f5ca0cd92c64a6944c19ca1ab53104030f
Author: park.yq <pa...@alibaba-inc.com>
AuthorDate: Fri Jan 18 16:40:30 2019 +0800
[FLINK-11389] Fix Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor
---
.../runtime/deployment/TaskDeploymentDescriptor.java | 6 +++---
.../deployment/TaskDeploymentDescriptorTest.java | 20 ++++++++++++++++++++
2 files changed, 23 insertions(+), 3 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 4f5b231..bb038eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -208,10 +208,10 @@ public final class TaskDeploymentDescriptor implements Serializable {
*/
@Nullable
public SerializedValue<TaskInformation> getSerializedTaskInformation() {
- if (serializedJobInformation instanceof NonOffloaded) {
- NonOffloaded<TaskInformation> jobInformation =
+ if (serializedTaskInformation instanceof NonOffloaded) {
+ NonOffloaded<TaskInformation> taskInformation =
(NonOffloaded<TaskInformation>) serializedTaskInformation;
- return jobInformation.serializedValue;
+ return taskInformation.serializedValue;
} else {
throw new IllegalStateException(
"Trying to work with offloaded serialized job information.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index e20d34b..22e943b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -42,6 +42,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -110,6 +111,25 @@ public class TaskDeploymentDescriptorTest {
assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
assertEquals(orig.getInputGates(), copy.getInputGates());
+
+ final TaskDeploymentDescriptor testOffLoadedTaskInformation = new TaskDeploymentDescriptor(
+ jobID,
+ new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+ new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()),
+ execId,
+ allocationId,
+ indexInSubtaskGroup,
+ attemptNumber,
+ targetSlotNumber,
+ taskRestore,
+ producedResults,
+ inputGates);
+ try {
+ testOffLoadedTaskInformation.getSerializedTaskInformation();
+ } catch (Exception e) {
+ assertTrue(e instanceof IllegalStateException);
+ }
+
}
catch (Exception e) {
e.printStackTrace();