You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/31 21:11:55 UTC
helix git commit: [HELIX-775] consolidate user content related apis
for task driver
Repository: helix
Updated Branches:
refs/heads/master 7ec5313bc -> b235c4ee5
[HELIX-775] consolidate user content related apis for task driver
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b235c4ee
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b235c4ee
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b235c4ee
Branch: refs/heads/master
Commit: b235c4ee5a82c5970d29e839317ea242813a58bc
Parents: 7ec5313
Author: Harry Zhang <hr...@linkedin.com>
Authored: Thu Oct 4 11:25:08 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Wed Oct 31 14:03:37 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 64 +++++++++++++-------
.../java/org/apache/helix/task/TaskUtil.java | 18 +++---
.../helix/task/TestGetSetUserContentStore.java | 53 +++++++++-------
3 files changed, 83 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 25a4fe4..e675c86 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -1022,7 +1022,12 @@ public class TaskDriver {
* @param taskName name of task. Optional if scope is WORKFLOW or JOB
* @return null if key-value pair not found or this content store does not exist. Otherwise,
* return a String
+ *
+ * @deprecated use the following equivalents: {@link #getWorkflowUserContentMap(String)},
+ * {@link #getJobUserContentMap(String, String)},
+ * @{{@link #getTaskContentMap(String, String, String)}}
*/
+ @Deprecated
public String getUserContent(String key, UserContentStore.Scope scope, String workflowName,
String jobName, String taskName) {
return TaskUtil.getUserContent(_propertyStore, key, scope, workflowName, jobName, taskName);
@@ -1055,36 +1060,53 @@ public class TaskDriver {
* @param taskPartitionId task partition id
* @return user content map
*/
- public Map<String, String> getTaskContentMap(String workflowName, String jobName, String taskPartitionId) {
+ public Map<String, String> getTaskUserContentMap(String workflowName, String jobName,
+ String taskPartitionId) {
String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId);
return TaskUtil.getTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName);
}
+ /**
+ * Add or update workflow user content with the given map - new keys will be added, and old
+ * keys will be updated
+ * @param workflowName workflow name
+ * @param contentToAddOrUpdate map containing items to add or update
+ */
+ public void addOrUpdateWorkflowUserContentMap(String workflowName,
+ final Map<String, String> contentToAddOrUpdate) {
+ TaskUtil
+ .addOrUpdateWorkflowJobUserContentMap(_propertyStore, workflowName, contentToAddOrUpdate);
+ }
+ /**
+ * Add or update job user content with the given map - new keys will be added, and old keys will
+ * be updated
+ * @param workflowName workflow name
+ * @param jobName Un-namespaced job name
+ * @param contentToAddOrUpdate map containing items to add or update
+ */
+ public void addOrUpdateJobUserContentMap(String workflowName, String jobName,
+ final Map<String, String> contentToAddOrUpdate) {
+ String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+ TaskUtil.addOrUpdateWorkflowJobUserContentMap(_propertyStore, namespacedJobName,
+ contentToAddOrUpdate);
+ }
/**
- * Set user content defined by the given key and string
- * @param key content key
- * @param value content value
- * @param workflowName name of the workflow - must provide when scope is WORKFLOW
- * @param jobName name of the job - must provide when scope is JOB or TASK
- * @param taskName name of the task - must provide when scope is TASK
- * @param scope scope of the content
+ * Add or update task user content with the given map - new keys will be added, and old keys
+ * will be updated
+ * @param workflowName workflow name
+ * @param jobName Un-namespaced job name
+ * @param taskPartitionId task partition id
+ * @param contentToAddOrUpdate map containing items to add or update
*/
- public void addUserContent(String key, String value, String workflowName, String jobName, String taskName,
- UserContentStore.Scope scope) {
- switch (scope) {
- case WORKFLOW:
- TaskUtil.addWorkflowJobUserContent(_propertyStore, workflowName, key, value);
- break;
- case JOB:
- TaskUtil.addWorkflowJobUserContent(_propertyStore, jobName, key, value);
- break;
- default:
- TaskUtil.addTaskUserContent(_propertyStore, jobName, taskName, key, value);
- break;
- }
+ public void addOrUpdateTaskUserContentMap(String workflowName, String jobName,
+ String taskPartitionId, final Map<String, String> contentToAddOrUpdate) {
+ String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+ String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId);
+ TaskUtil.addOrUpdateTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName,
+ contentToAddOrUpdate);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 5581b6f..379026d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -346,12 +346,13 @@ public class TaskUtil {
*/
protected static void addWorkflowJobUserContent(final HelixManager manager,
String workflowJobResource, final String key, final String value) {
- addWorkflowJobUserContent(manager.getHelixPropertyStore(), workflowJobResource, key, value);
+ addOrUpdateWorkflowJobUserContentMap(manager.getHelixPropertyStore(), workflowJobResource,
+ Collections.singletonMap(key, value));
}
/* package */
- static void addWorkflowJobUserContent(final HelixPropertyStore<ZNRecord> propertyStore,
- String workflowJobResource, final String key, final String value) {
+ static void addOrUpdateWorkflowJobUserContentMap(final HelixPropertyStore<ZNRecord> propertyStore,
+ String workflowJobResource, final Map<String, String> contentToAddOrUpdate) {
if (workflowJobResource == null) {
throw new IllegalArgumentException("workflowJobResource must be not null when adding workflow / job user content");
}
@@ -361,7 +362,7 @@ public class TaskUtil {
propertyStore.update(path, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord znRecord) {
- znRecord.setSimpleField(key, value);
+ znRecord.getSimpleFields().putAll(contentToAddOrUpdate);
return znRecord;
}
}, AccessOption.PERSISTENT);
@@ -407,12 +408,13 @@ public class TaskUtil {
*/
protected static void addTaskUserContent(final HelixManager manager, String job,
final String task, final String key, final String value) {
- addTaskUserContent(manager.getHelixPropertyStore(), job, task, key, value);
+ addOrUpdateTaskUserContentMap(manager.getHelixPropertyStore(), job, task,
+ Collections.singletonMap(key, value));
}
/* package */
- static void addTaskUserContent(final HelixPropertyStore<ZNRecord> propertyStore,
- final String job, final String task, final String key, final String value) {
+ static void addOrUpdateTaskUserContentMap(final HelixPropertyStore<ZNRecord> propertyStore,
+ final String job, final String task, final Map<String, String> contentToAddOrUpdate) {
if (job == null || task == null) {
throw new IllegalArgumentException("job and task must be not null when adding task user content");
}
@@ -425,7 +427,7 @@ public class TaskUtil {
if (znRecord.getMapField(task) == null) {
znRecord.setMapField(task, new HashMap<String, String>());
}
- znRecord.getMapField(task).put(key, value);
+ znRecord.getMapField(task).putAll(contentToAddOrUpdate);
return znRecord;
}
}, AccessOption.PERSISTENT);
http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
index d4ba29a..65f6b07 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -54,12 +55,12 @@ public class TestGetSetUserContentStore extends TaskTestBase {
private class TaskRecord {
String workflowName;
String jobName;
- String taskName;
+ String taskPartitionId;
public TaskRecord(String workflow, String job, String task) {
workflowName = workflow;
jobName = job;
- taskName = task;
+ taskPartitionId = task;
}
}
@@ -131,11 +132,10 @@ public class TestGetSetUserContentStore extends TaskTestBase {
taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, String>()));
JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
.addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap);
- String jobSuffix = "JOB" + i;
- String jobName = workflowName + "_" + jobSuffix;
- String taskName = jobName + "_0";
- workflowBuilder.addJob("JOB" + i, jobConfigBulider);
- recordMap.put(jobName, new TaskRecord(workflowName, jobName, taskName));
+ String jobName = "JOB" + i;
+ String taskPartitionId = "0";
+ workflowBuilder.addJob(jobName, jobConfigBulider);
+ recordMap.put(jobName, new TaskRecord(workflowName, jobName, taskPartitionId));
}
// Start the workflow and wait for all tasks started
@@ -143,33 +143,40 @@ public class TestGetSetUserContentStore extends TaskTestBase {
allTasksReady.await();
// add "workflow":"workflow" to the workflow's user content
- _driver.addUserContent(workflowName, workflowName, workflowName, null, null, UserContentStore.Scope.WORKFLOW);
+ _driver.addOrUpdateWorkflowUserContentMap(workflowName,
+ Collections.singletonMap(workflowName, workflowName));
for (TaskRecord rec : recordMap.values()) {
// add "job":"job" to the job's user content
- _driver.addUserContent(rec.jobName, rec.jobName, null, rec.jobName, null, UserContentStore.Scope.JOB);
- // String taskId = _driver.getJobContext(rec.jobName).getTaskIdForPartition(0);
-
+ String namespacedJobName = TaskUtil.getNamespacedJobName(rec.workflowName, rec.jobName);
+ _driver.addOrUpdateJobUserContentMap(rec.workflowName, rec.jobName,
+ Collections.singletonMap(namespacedJobName, namespacedJobName));
+ String namespacedTaskName =
+ TaskUtil.getNamespacedTaskName(namespacedJobName, rec.taskPartitionId);
// add "taskId":"taskId" to the task's user content
- _driver.addUserContent(rec.taskName, rec.taskName, null, rec.jobName, rec.taskName, UserContentStore.Scope.TASK);
+ _driver.addOrUpdateTaskUserContentMap(rec.workflowName, rec.jobName, rec.taskPartitionId,
+ Collections.singletonMap(namespacedTaskName, namespacedTaskName));
}
adminReady.countDown();
_driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
// Aggregate key-value mappings in UserContentStore
for (TaskRecord rec : recordMap.values()) {
- Assert.assertEquals(_driver
- .getUserContent(TaskDumpResultKey.WorkflowContent.name(), UserContentStore.Scope.WORKFLOW,
- rec.workflowName, rec.jobName, rec.taskName),
+ Assert.assertEquals(_driver.getWorkflowUserContentMap(rec.workflowName)
+ .get(TaskDumpResultKey.WorkflowContent.name()),
constructContentStoreResultString(rec.workflowName, rec.workflowName));
- Assert.assertEquals(_driver
- .getUserContent(TaskDumpResultKey.JobContent.name(), UserContentStore.Scope.JOB,
- rec.workflowName, rec.jobName, rec.taskName),
- constructContentStoreResultString(rec.jobName, rec.jobName));
- Assert.assertEquals(_driver
- .getUserContent(TaskDumpResultKey.TaskContent.name(), UserContentStore.Scope.TASK,
- rec.workflowName, rec.jobName, rec.taskName),
- constructContentStoreResultString(rec.taskName, rec.taskName));
+
+ String namespacedJobName = TaskUtil.getNamespacedJobName(rec.workflowName, rec.jobName);
+ Assert.assertEquals(_driver.getJobUserContentMap(rec.workflowName, rec.jobName)
+ .get(TaskDumpResultKey.JobContent.name()),
+ constructContentStoreResultString(namespacedJobName, namespacedJobName));
+
+ String namespacedTaskName =
+ TaskUtil.getNamespacedTaskName(namespacedJobName, rec.taskPartitionId);
+ Assert.assertEquals(
+ _driver.getTaskUserContentMap(rec.workflowName, rec.jobName, rec.taskPartitionId)
+ .get(TaskDumpResultKey.TaskContent.name()),
+ constructContentStoreResultString(namespacedTaskName, namespacedTaskName));
}
}