You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/16 21:57:02 UTC
helix git commit: [HELIX-732] Expose UserContentStore in TaskDriver
Repository: helix
Updated Branches:
refs/heads/master 343f8dd33 -> 3ec93129e
[HELIX-732] Expose UserContentStore in TaskDriver
There was a user request for this feature. The intended use is to allow for aggregation work reading from temporary data written by tasks, by allowing a get() of UserContentStore at the TaskDriver level. UserContentStore is a potentially useful feature that is currently under-utilized - this will enable Gobblin and other users of Task Framework to better utilize UserContentStore.
Changelist:
1. Add getUserContentStore() in TaskDriver
2. Add TestUserContentStore, an integration test for this feature
3. Add descriptive JavaDoc warning the user that get() and put() methods for UserContentStore is not thread-safe
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3ec93129
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3ec93129
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3ec93129
Branch: refs/heads/master
Commit: 3ec93129e717185cd1db0fc40d09e7603d8aee5d
Parents: 343f8dd
Author: Hunter Lee <na...@gmail.com>
Authored: Mon Jul 16 12:18:03 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Mon Jul 16 14:56:34 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 17 ++-
.../java/org/apache/helix/task/TaskUtil.java | 48 +++++--
.../org/apache/helix/task/UserContentStore.java | 53 ++++---
.../helix/task/TestGetUserContentStore.java | 144 +++++++++++++++++++
4 files changed, 220 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index bc767b7..4fe732b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -988,6 +988,21 @@ public class TaskDriver {
}
/**
+ * Returns the lookup of UserContentStore by key.
+ * @param key key used at write time by a task implementing UserContentStore
+ * @param scope scope used at write time
+ * @param workflowName name of workflow. Must be supplied
+ * @param jobName name of job. Optional if scope is WORKFLOW
+ * @param taskName name of task. Optional if scope is WORKFLOW or JOB
+ * @return null if key-value pair not found or this content store does not exist. Otherwise,
+ * return a String
+ */
+ public String getUserContent(String key, UserContentStore.Scope scope, String workflowName,
+ String jobName, String taskName) {
+ return TaskUtil.getUserContent(_propertyStore, key, scope, workflowName, jobName, taskName);
+ }
+
+ /**
* Throw Exception if children nodes will exceed limitation after adding newNodesCount children.
* @param newConfigNodeCount
*/
@@ -999,4 +1014,4 @@ public class TaskDriver {
"Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS.");
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 68a9d4a..9ca0062 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -309,17 +309,17 @@ public class TaskUtil {
}
/**
- * Get user defined workflow or job level key-value pair data
- * @param manager a connection to Helix
- * @param workflowJobResource the name of workflow
- * @param key the key of key-value pair
+ * Get user-defined workflow/job scope key-value pair data. This method takes
+ * HelixPropertyStore<ZNRecord>.
+ * @param propertyStore
+ * @param workflowJobResource
+ * @param key
* @return null if there is no such pair, otherwise return a String
*/
- protected static String getWorkflowJobUserContent(HelixManager manager,
+ protected static String getWorkflowJobUserContent(HelixPropertyStore<ZNRecord> propertyStore,
String workflowJobResource, String key) {
- ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null,
- AccessOption.PERSISTENT);
+ ZNRecord r = propertyStore.get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+ workflowJobResource, USER_CONTENT_NODE), null, AccessOption.PERSISTENT);
return r != null ? r.getSimpleField(key) : null;
}
@@ -346,15 +346,15 @@ public class TaskUtil {
/**
* Get user defined task level key-value pair data
- * @param manager a connection to Helix
+ * @param propertyStore
* @param job the name of job
* @param task the name of the task
* @param key the key of key-value pair
* @return null if there is no such pair, otherwise return a String
*/
- protected static String getTaskUserContent(HelixManager manager, String job, String task,
- String key) {
- ZNRecord r = manager.getHelixPropertyStore().get(
+ protected static String getTaskUserContent(HelixPropertyStore<ZNRecord> propertyStore, String job,
+ String task, String key) {
+ ZNRecord r = propertyStore.get(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null,
AccessOption.PERSISTENT);
return r != null ? (r.getMapField(task) != null ? r.getMapField(task).get(key) : null) : null;
@@ -386,6 +386,30 @@ public class TaskUtil {
}
/**
+ * Helper method for looking up UserContentStore content.
+ * @param propertyStore
+ * @param key
+ * @param scope
+ * @param workflowName
+ * @param jobName
+ * @param taskName
+ * @return value corresponding to the key
+ */
+ protected static String getUserContent(HelixPropertyStore propertyStore, String key,
+ UserContentStore.Scope scope, String workflowName, String jobName, String taskName) {
+ switch (scope) {
+ case WORKFLOW:
+ return TaskUtil.getWorkflowJobUserContent(propertyStore, workflowName, key);
+ case JOB:
+ return TaskUtil.getWorkflowJobUserContent(propertyStore, jobName, key);
+ case TASK:
+ return TaskUtil.getTaskUserContent(propertyStore, jobName, taskName, key);
+ default:
+ throw new HelixException("Invalid scope : " + scope.name());
+ }
+ }
+
+ /**
* Get a workflow-qualified job name for a single-job workflow
* @param singleJobWorkflow the name of the single-job workflow
* @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow
http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
index b188be7..ba80e88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
+++ b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
@@ -25,7 +25,6 @@ import org.apache.helix.HelixManager;
/**
* UserContentStore provides default implementation of user defined key-value pair store per task,
* job and workflow level.
- *
* TODO: This class should be merged to Task interface when Helix bump up to Java 8
*/
public abstract class UserContentStore {
@@ -47,10 +46,10 @@ public abstract class UserContentStore {
TASK
}
- private HelixManager _manager;
- private String _workflowName;
- private String _jobName;
- private String _taskName;
+ protected HelixManager _manager;
+ protected String _workflowName;
+ protected String _jobName;
+ protected String _taskName;
/**
* Default initialization of user content store
@@ -67,44 +66,40 @@ public abstract class UserContentStore {
}
/**
- * Default implementation for user defined put key-value pair
+ * Default implementation for user defined put key-value pair. Warning: this method is not
+ * thread-safe - we recommend creating a different key-value pair instead of modifying the value
+ * on the same key.
* @param key The key of key-value pair
* @param value The value of key-value pair
* @param scope The scope defines which layer to store
*/
public void putUserContent(String key, String value, Scope scope) {
switch (scope) {
- case WORKFLOW:
- TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value);
- break;
- case JOB:
- TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value);
- break;
- case TASK:
- TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value);
- break;
- default:
- throw new HelixException("Invalid scope : " + scope.name());
+ case WORKFLOW:
+ TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value);
+ break;
+ case JOB:
+ TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value);
+ break;
+ case TASK:
+ TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value);
+ break;
+ default:
+ throw new HelixException("Invalid scope : " + scope.name());
}
}
/**
- * Default implementation for user defined get key-value pair
+ * Default implementation for user defined get key-value pair. Warning: this method is not
+ * thread-safe - we recommend creating a different key-value pair instead of modifying the value
+ * on the same key.
* @param key The key of key-value pair
* @param scope The scope defines which layer that key-value pair stored
* @return Null if key-value pair not found or this content store does not exists. Otherwise,
* return a String
*/
public String getUserContent(String key, Scope scope) {
- switch (scope) {
- case WORKFLOW:
- return TaskUtil.getWorkflowJobUserContent(_manager, _workflowName, key);
- case JOB:
- return TaskUtil.getWorkflowJobUserContent(_manager, _jobName, key);
- case TASK:
- return TaskUtil.getTaskUserContent(_manager, _jobName, _taskName, key);
- default:
- throw new HelixException("Invalid scope : " + scope.name());
- }
+ return TaskUtil.getUserContent(_manager.getHelixPropertyStore(), key, scope, _workflowName,
+ _jobName, _taskName);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java b/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
new file mode 100644
index 0000000..392c278
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
@@ -0,0 +1,144 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestGetUserContentStore extends TaskTestBase {
+ private static final String JOB_COMMAND = "DummyCommand";
+ private Map<String, String> _jobCommandMap;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _participants = new MockParticipantManager[_numNodes];
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursively(namespace);
+ }
+
+ // Setup cluster and instances
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ setupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < _numNodes; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+ setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ // start dummy participants
+ for (int i = 0; i < _numNodes; i++) {
+ final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+
+ // Set task callbacks
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+ TaskFactory shortTaskFactory = new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new WriteTask(context);
+ }
+ };
+ taskFactoryReg.put("WriteTask", shortTaskFactory);
+
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task",
+ new TaskStateModelFactory(_participants[i], taskFactoryReg));
+ _participants[i].syncStart();
+ }
+
+ // Start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // Start an admin connection
+ _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+ InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+ _driver = new TaskDriver(_manager);
+
+ _jobCommandMap = new HashMap<>();
+ }
+
+ @Test
+ public void testGetUserContentStore() throws InterruptedException {
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+ WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName);
+ configBuilder.setAllowOverlapJobAssignment(true);
+ workflowBuilder.setWorkflowConfig(configBuilder.build());
+
+ List<String> jobsThatRan = new ArrayList<>();
+ // Create 5 jobs with 1 WriteTask each
+ for (int i = 0; i < 5; i++) {
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, String>()));
+ JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
+ .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap);
+ workflowBuilder.addJob("JOB" + i, jobConfigBulider);
+ jobsThatRan.add(workflowName + "_JOB" + i);
+ }
+
+ // Start the workflow and wait until completion
+ _driver.start(workflowBuilder.build());
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+ // Aggregate key-value mappings in UserContentStore
+ int runCount = 0;
+ for (String jobName : jobsThatRan) {
+ String value = _driver.getUserContent(jobName, UserContentStore.Scope.WORKFLOW, workflowName,
+ jobName, null);
+ runCount += Integer.parseInt(value);
+ }
+ Assert.assertEquals(runCount, 5);
+ }
+
+ /**
+ * A mock task that writes to UserContentStore. MockTask extends UserContentStore.
+ */
+ private class WriteTask extends MockTask {
+
+ public WriteTask(TaskCallbackContext context) {
+ super(context);
+ }
+
+ @Override
+ public TaskResult run() {
+ putUserContent(_jobName, Integer.toString(1), Scope.WORKFLOW);
+ return new TaskResult(TaskResult.Status.COMPLETED, "");
+ }
+ }
+}