You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/25 07:38:28 UTC
[flink] branch master updated: [FLINK-17300] Log the lineage
information between ExecutionAttemptID and AllocationID
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7e48549 [FLINK-17300] Log the lineage information between ExecutionAttemptID and AllocationID
7e48549 is described below
commit 7e48549b822bc54f8dc0a5e1f9cbb5f3156fda06
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Wed Apr 22 14:14:29 2020 +0800
[FLINK-17300] Log the lineage information between ExecutionAttemptID and AllocationID
This closes #11852.
---
.../java/org/apache/flink/runtime/executiongraph/Execution.java | 6 ++----
.../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 3 ++-
.../org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java | 2 +-
3 files changed, 5 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index f6702f8..56415e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -728,10 +728,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
return;
}
- if (LOG.isInfoEnabled()) {
- LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
- attemptNumber, getAssignedResourceLocation()));
- }
+ LOG.info("Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(),
+ attemptNumber, vertex.getCurrentExecutionAttempt().getAttemptId(), getAssignedResourceLocation(), slot.getAllocationId());
final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory
.fromExecutionVertex(vertex, attemptNumber)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f8438f5..bd2834a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -643,7 +643,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED, task::isBackPressured);
- log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
+ log.info("Received task {} ({}), deploy into slot with allocation id {}.",
+ task.getTaskInfo().getTaskNameWithSubtasks(), tdd.getExecutionAttemptId(), tdd.getAllocationId());
boolean taskAdded;
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index af0f532..aca45b1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -573,7 +573,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
String expected = "Starting TaskManagers";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '" + jobmanagerLog + "'",
content.contains(expected));
- expected = " (2/2) (attempt #0) to ";
+ expected = " (2/2) (attempt #0) with attempt id ";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
"This string checks that the job has been started with a parallelism of 2. Log contents: '" + jobmanagerLog + "'",
content.contains(expected));