You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 08:06:57 UTC
[6/7] flink git commit: [hotfix] Expose AllocationID as string
through TaskInfo
[hotfix] Expose AllocationID as string through TaskInfo
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edece9c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edece9c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edece9c1
Branch: refs/heads/master
Commit: edece9c1e2ec8e759b783ea07dcaa50d4e8704a2
Parents: 8993599
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 13 13:53:48 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/common/TaskInfo.java | 36 ++++++++++++++++++--
.../util/AbstractRuntimeUDFContext.java | 7 ++++
.../apache/flink/runtime/taskmanager/Task.java | 3 +-
3 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 33f2e0c..2583687 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
import org.apache.flink.annotation.Internal;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
@@ -31,12 +31,35 @@ public class TaskInfo {
private final String taskName;
private final String taskNameWithSubtasks;
+ private final String allocationIDAsString;
private final int maxNumberOfParallelSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;
- public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+ public TaskInfo(
+ String taskName,
+ int maxNumberOfParallelSubtasks,
+ int indexOfSubtask,
+ int numberOfParallelSubtasks,
+ int attemptNumber) {
+ this(
+ taskName,
+ maxNumberOfParallelSubtasks,
+ indexOfSubtask,
+ numberOfParallelSubtasks,
+ attemptNumber,
+ "UNKNOWN");
+ }
+
+ public TaskInfo(
+ String taskName,
+ int maxNumberOfParallelSubtasks,
+ int indexOfSubtask,
+ int numberOfParallelSubtasks,
+ int attemptNumber,
+ String allocationIDAsString) {
+
checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number.");
checkArgument(maxNumberOfParallelSubtasks >= 1, "Max parallelism must be a positive number.");
checkArgument(maxNumberOfParallelSubtasks >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
@@ -49,6 +72,7 @@ public class TaskInfo {
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.attemptNumber = attemptNumber;
this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 1) + '/' + numberOfParallelSubtasks + ')';
+ this.allocationIDAsString = checkNotNull(allocationIDAsString);
}
/**
@@ -107,4 +131,12 @@ public class TaskInfo {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+ /**
+ * Returns the allocation id for where this task is executed.
+ * @return the allocation id for where this task is executed.
+ */
+ public String getAllocationIDAsString() {
+ return allocationIDAsString;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6246e80..d6262c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.functions.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -239,4 +240,10 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
+
+ @Internal
+ @VisibleForTesting
+ public String getAllocationIDAsString() {
+ return taskInfo.getAllocationIDAsString();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f7ed231..f8aa0e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -310,7 +310,8 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
taskInformation.getMaxNumberOfSubtaks(),
subtaskIndex,
taskInformation.getNumberOfSubtasks(),
- attemptNumber);
+ attemptNumber,
+ String.valueOf(slotAllocationId));
this.jobId = jobInformation.getJobId();
this.vertexId = taskInformation.getJobVertexId();