You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 08:06:57 UTC

[6/7] flink git commit: [hotfix] Expose AllocationID as string through TaskInfo

[hotfix] Expose AllocationID as string through TaskInfo


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edece9c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edece9c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edece9c1

Branch: refs/heads/master
Commit: edece9c1e2ec8e759b783ea07dcaa50d4e8704a2
Parents: 8993599
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 13 13:53:48 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/common/TaskInfo.java   | 36 ++++++++++++++++++--
 .../util/AbstractRuntimeUDFContext.java         |  7 ++++
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 3 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 33f2e0c..2583687 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.Internal;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
@@ -31,12 +31,35 @@ public class TaskInfo {
 
 	private final String taskName;
 	private final String taskNameWithSubtasks;
+	private final String allocationIDAsString;
 	private final int maxNumberOfParallelSubtasks;
 	private final int indexOfSubtask;
 	private final int numberOfParallelSubtasks;
 	private final int attemptNumber;
 
-	public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+	public TaskInfo(
+		String taskName,
+		int maxNumberOfParallelSubtasks,
+		int indexOfSubtask,
+		int numberOfParallelSubtasks,
+		int attemptNumber) {
+		this(
+			taskName,
+			maxNumberOfParallelSubtasks,
+			indexOfSubtask,
+			numberOfParallelSubtasks,
+			attemptNumber,
+			"UNKNOWN");
+	}
+
+	public TaskInfo(
+		String taskName,
+		int maxNumberOfParallelSubtasks,
+		int indexOfSubtask,
+		int numberOfParallelSubtasks,
+		int attemptNumber,
+		String allocationIDAsString) {
+
 		checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number.");
 		checkArgument(maxNumberOfParallelSubtasks >= 1, "Max parallelism must be a positive number.");
 		checkArgument(maxNumberOfParallelSubtasks >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
@@ -49,6 +72,7 @@ public class TaskInfo {
 		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
 		this.attemptNumber = attemptNumber;
 		this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 1) + '/' + numberOfParallelSubtasks + ')';
+		this.allocationIDAsString = checkNotNull(allocationIDAsString);
 	}
 
 	/**
@@ -107,4 +131,12 @@ public class TaskInfo {
 	public String getTaskNameWithSubtasks() {
 		return this.taskNameWithSubtasks;
 	}
+
+	/**
+	 * Returns the allocation id for where this task is executed.
+	 * @return the allocation id for where this task is executed.
+	 */
+	public String getAllocationIDAsString() {
+		return allocationIDAsString;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6246e80..d6262c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.functions.util;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -239,4 +240,10 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 		throw new UnsupportedOperationException(
 				"This state is only accessible by functions executed on a KeyedStream");
 	}
+
+	@Internal
+	@VisibleForTesting
+	public String getAllocationIDAsString() {
+		return taskInfo.getAllocationIDAsString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edece9c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f7ed231..f8aa0e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -310,7 +310,8 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 				taskInformation.getMaxNumberOfSubtaks(),
 				subtaskIndex,
 				taskInformation.getNumberOfSubtasks(),
-				attemptNumber);
+				attemptNumber,
+				String.valueOf(slotAllocationId));
 
 		this.jobId = jobInformation.getJobId();
 		this.vertexId = taskInformation.getJobVertexId();