You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/31 20:57:56 UTC

helix git commit: [HELIX-775] add task driver support for helix rest to add/get task framework user content

Repository: helix
Updated Branches:
  refs/heads/master 1103fecb6 -> 7ec5313bc


[HELIX-775] add task driver support for helix rest to add/get task framework user content


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7ec5313b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7ec5313b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7ec5313b

Branch: refs/heads/master
Commit: 7ec5313bccb679014d6a0605ee5d7184063e555e
Parents: 1103fec
Author: Harry Zhang <hr...@linkedin.com>
Authored: Wed Oct 31 13:55:44 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Wed Oct 31 13:55:44 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 35 ++++++++++++++
 .../java/org/apache/helix/task/TaskUtil.java    | 50 +++++++++++++++++---
 2 files changed, 78 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7ec5313b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 54e3ab3..25a4fe4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -1029,6 +1029,41 @@ public class TaskDriver {
   }
 
   /**
+   * Return the full user content map for workflow
+   * @param workflowName workflow name
+   * @return user content map
+   */
+  public Map<String, String> getWorkflowUserContentMap(String workflowName) {
+    return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, workflowName);
+  }
+
+  /**
+   * Return full user content map for job
+   * @param workflowName workflow name
+   * @param jobName Un-namespaced job name
+   * @return user content map
+   */
+  public Map<String, String> getJobUserContentMap(String workflowName, String jobName) {
+    String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, namespacedJobName);
+  }
+
+  /**
+   * Return full user content map for task
+   * @param workflowName workflow name
+   * @param jobName Un-namespaced job name
+   * @param taskPartitionId task partition id
+   * @return user content map
+   */
+  public Map<String, String> getTaskContentMap(String workflowName, String jobName, String taskPartitionId) {
+    String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId);
+    return TaskUtil.getTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName);
+  }
+
+
+
+  /**
    * Set user content defined by the given key and string
    * @param key content key
    * @param value content value

http://git-wip-us.apache.org/repos/asf/helix/blob/7ec5313b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 3461233..5581b6f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -26,7 +26,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
@@ -320,9 +319,22 @@ public class TaskUtil {
    */
   protected static String getWorkflowJobUserContent(HelixPropertyStore<ZNRecord> propertyStore,
       String workflowJobResource, String key) {
-    ZNRecord r = propertyStore.get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
-        workflowJobResource, USER_CONTENT_NODE), null, AccessOption.PERSISTENT);
-    return r != null ? r.getSimpleField(key) : null;
+    Map<String, String> userContentMap = getWorkflowJobUserContentMap(propertyStore, workflowJobResource);
+    return userContentMap != null ? userContentMap.get(key) : null;
+  }
+
+  /**
+   * get workflow/job user content map
+   * @param propertyStore property store
+   * @param workflowJobResource workflow name or namespaced job name
+   * @return user content map
+   */
+  protected static Map<String, String> getWorkflowJobUserContentMap(
+      HelixPropertyStore<ZNRecord> propertyStore, String workflowJobResource) {
+    ZNRecord record = propertyStore.get(Joiner.on("/")
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null,
+        AccessOption.PERSISTENT);
+    return record != null ? record.getSimpleFields() : null;
   }
 
   /**
@@ -365,10 +377,24 @@ public class TaskUtil {
    */
   protected static String getTaskUserContent(HelixPropertyStore<ZNRecord> propertyStore, String job,
       String task, String key) {
-    ZNRecord r = propertyStore.get(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null,
+    Map<String, String> userContentStore = getTaskUserContentMap(propertyStore, job, task);
+    return userContentStore != null ? userContentStore.get(key) : null;
+  }
+
+  /**
+   * Return full task user content map
+   * @param propertyStore property store
+   * @param namespacedJobName namespaced job name
+   * @param taskPartitionId task partition id
+   * @return
+   */
+  protected static Map<String, String> getTaskUserContentMap(
+      HelixPropertyStore<ZNRecord> propertyStore, String namespacedJobName,
+      String taskPartitionId) {
+    ZNRecord record = propertyStore.get(Joiner.on("/")
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName, USER_CONTENT_NODE), null,
         AccessOption.PERSISTENT);
-    return r != null ? (r.getMapField(task) != null ? r.getMapField(task).get(key) : null) : null;
+    return record != null ? record.getMapField(taskPartitionId) : null;
   }
 
   /**
@@ -449,6 +475,16 @@ public class TaskUtil {
   }
 
   /**
+   * get a task name, namespaced by it's job and workflow
+   * @param namespacedJobName namespaced job name
+   * @param taskPartitionId task partition id
+   * @return
+   */
+  public static String getNamespacedTaskName(String namespacedJobName, String taskPartitionId) {
+    return String.format("%s_%s", namespacedJobName, taskPartitionId);
+  }
+
+  /**
    * Remove the workflow namespace from the job name
    * @param workflow the name of the workflow that owns the job
    * @param jobName the namespaced job name