You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/25 07:38:28 UTC

[flink] branch master updated: [FLINK-17300] Log the lineage information between ExecutionAttemptID and AllocationID

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e48549  [FLINK-17300] Log the lineage information between ExecutionAttemptID and AllocationID
7e48549 is described below

commit 7e48549b822bc54f8dc0a5e1f9cbb5f3156fda06
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Wed Apr 22 14:14:29 2020 +0800

    [FLINK-17300] Log the lineage information between ExecutionAttemptID and AllocationID
    
    This closes #11852.
---
 .../java/org/apache/flink/runtime/executiongraph/Execution.java     | 6 ++----
 .../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java    | 3 ++-
 .../org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java   | 2 +-
 3 files changed, 5 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index f6702f8..56415e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -728,10 +728,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				return;
 			}
 
-			if (LOG.isInfoEnabled()) {
-				LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
-						attemptNumber, getAssignedResourceLocation()));
-			}
+			LOG.info("Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(),
+				attemptNumber, vertex.getCurrentExecutionAttempt().getAttemptId(), getAssignedResourceLocation(), slot.getAllocationId());
 
 			final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory
 				.fromExecutionVertex(vertex, attemptNumber)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f8438f5..bd2834a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -643,7 +643,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 			taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED, task::isBackPressured);
 
-			log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
+			log.info("Received task {} ({}), deploy into slot with allocation id {}.",
+				task.getTaskInfo().getTaskNameWithSubtasks(), tdd.getExecutionAttemptId(), tdd.getAllocationId());
 
 			boolean taskAdded;
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index af0f532..aca45b1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -573,7 +573,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			String expected = "Starting TaskManagers";
 			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '" + jobmanagerLog + "'",
 				content.contains(expected));
-			expected = " (2/2) (attempt #0) to ";
+			expected = " (2/2) (attempt #0) with attempt id ";
 			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
 					"This string checks that the job has been started with a parallelism of 2. Log contents: '" + jobmanagerLog + "'",
 				content.contains(expected));