You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/31 20:57:56 UTC
helix git commit: [HELIX-775] add task driver support for helix rest
to add/get task framework user content
Repository: helix
Updated Branches:
refs/heads/master 1103fecb6 -> 7ec5313bc
[HELIX-775] add task driver support for helix rest to add/get task framework user content
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7ec5313b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7ec5313b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7ec5313b
Branch: refs/heads/master
Commit: 7ec5313bccb679014d6a0605ee5d7184063e555e
Parents: 1103fec
Author: Harry Zhang <hr...@linkedin.com>
Authored: Wed Oct 31 13:55:44 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Wed Oct 31 13:55:44 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 35 ++++++++++++++
.../java/org/apache/helix/task/TaskUtil.java | 50 +++++++++++++++++---
2 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7ec5313b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 54e3ab3..25a4fe4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -1029,6 +1029,41 @@ public class TaskDriver {
}
/**
+ * Return the full user content map for workflow
+ * @param workflowName workflow name
+ * @return user content map
+ */
+ public Map<String, String> getWorkflowUserContentMap(String workflowName) {
+ return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, workflowName);
+ }
+
+ /**
+ * Return full user content map for job
+ * @param workflowName workflow name
+ * @param jobName Un-namespaced job name
+ * @return user content map
+ */
+ public Map<String, String> getJobUserContentMap(String workflowName, String jobName) {
+ String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+ return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, namespacedJobName);
+ }
+
+ /**
+ * Return full user content map for task
+ * @param workflowName workflow name
+ * @param jobName Un-namespaced job name
+ * @param taskPartitionId task partition id
+ * @return user content map
+ */
+ public Map<String, String> getTaskContentMap(String workflowName, String jobName, String taskPartitionId) {
+ String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+ String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId);
+ return TaskUtil.getTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName);
+ }
+
+
+
+ /**
* Set user content defined by the given key and string
* @param key content key
* @param value content value
http://git-wip-us.apache.org/repos/asf/helix/blob/7ec5313b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 3461233..5581b6f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -26,7 +26,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
@@ -320,9 +319,22 @@ public class TaskUtil {
*/
protected static String getWorkflowJobUserContent(HelixPropertyStore<ZNRecord> propertyStore,
String workflowJobResource, String key) {
- ZNRecord r = propertyStore.get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
- workflowJobResource, USER_CONTENT_NODE), null, AccessOption.PERSISTENT);
- return r != null ? r.getSimpleField(key) : null;
+ Map<String, String> userContentMap = getWorkflowJobUserContentMap(propertyStore, workflowJobResource);
+ return userContentMap != null ? userContentMap.get(key) : null;
+ }
+
+ /**
+ * get workflow/job user content map
+ * @param propertyStore property store
+ * @param workflowJobResource workflow name or namespaced job name
+ * @return user content map
+ */
+ protected static Map<String, String> getWorkflowJobUserContentMap(
+ HelixPropertyStore<ZNRecord> propertyStore, String workflowJobResource) {
+ ZNRecord record = propertyStore.get(Joiner.on("/")
+ .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null,
+ AccessOption.PERSISTENT);
+ return record != null ? record.getSimpleFields() : null;
}
/**
@@ -365,10 +377,24 @@ public class TaskUtil {
*/
protected static String getTaskUserContent(HelixPropertyStore<ZNRecord> propertyStore, String job,
String task, String key) {
- ZNRecord r = propertyStore.get(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null,
+ Map<String, String> userContentStore = getTaskUserContentMap(propertyStore, job, task);
+ return userContentStore != null ? userContentStore.get(key) : null;
+ }
+
+ /**
+ * Return full task user content map
+ * @param propertyStore property store
+ * @param namespacedJobName namespaced job name
+ * @param taskPartitionId task partition id
+ * @return
+ */
+ protected static Map<String, String> getTaskUserContentMap(
+ HelixPropertyStore<ZNRecord> propertyStore, String namespacedJobName,
+ String taskPartitionId) {
+ ZNRecord record = propertyStore.get(Joiner.on("/")
+ .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName, USER_CONTENT_NODE), null,
AccessOption.PERSISTENT);
- return r != null ? (r.getMapField(task) != null ? r.getMapField(task).get(key) : null) : null;
+ return record != null ? record.getMapField(taskPartitionId) : null;
}
/**
@@ -449,6 +475,16 @@ public class TaskUtil {
}
/**
+ * get a task name, namespaced by it's job and workflow
+ * @param namespacedJobName namespaced job name
+ * @param taskPartitionId task partition id
+ * @return
+ */
+ public static String getNamespacedTaskName(String namespacedJobName, String taskPartitionId) {
+ return String.format("%s_%s", namespacedJobName, taskPartitionId);
+ }
+
+ /**
* Remove the workflow namespace from the job name
* @param workflow the name of the workflow that owns the job
* @param jobName the namespaced job name