You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/16 21:57:02 UTC

helix git commit: [HELIX-732] Expose UserContentStore in TaskDriver

Repository: helix
Updated Branches:
  refs/heads/master 343f8dd33 -> 3ec93129e


[HELIX-732] Expose UserContentStore in TaskDriver

There was a user request for this feature. The intended use is to allow for aggregation work reading from temporary data written by tasks, by allowing a get() of UserContentStore at the TaskDriver level. UserContentStore is a potentially useful feature that is currently under-utilized - this will enable Gobblin and other users of Task Framework to better utilize UserContentStore.

Changelist:
1. Add getUserContentStore() in TaskDriver
2. Add TestUserContentStore, an integration test for this feature
3. Add descriptive JavaDoc warning the user that get() and put() methods for UserContentStore is not thread-safe


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3ec93129
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3ec93129
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3ec93129

Branch: refs/heads/master
Commit: 3ec93129e717185cd1db0fc40d09e7603d8aee5d
Parents: 343f8dd
Author: Hunter Lee <na...@gmail.com>
Authored: Mon Jul 16 12:18:03 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Mon Jul 16 14:56:34 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |  17 ++-
 .../java/org/apache/helix/task/TaskUtil.java    |  48 +++++--
 .../org/apache/helix/task/UserContentStore.java |  53 ++++---
 .../helix/task/TestGetUserContentStore.java     | 144 +++++++++++++++++++
 4 files changed, 220 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index bc767b7..4fe732b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -988,6 +988,21 @@ public class TaskDriver {
   }
 
   /**
+   * Returns the lookup of UserContentStore by key.
+   * @param key key used at write time by a task implementing UserContentStore
+   * @param scope scope used at write time
+   * @param workflowName name of workflow. Must be supplied
+   * @param jobName name of job. Optional if scope is WORKFLOW
+   * @param taskName name of task. Optional if scope is WORKFLOW or JOB
+   * @return null if key-value pair not found or this content store does not exist. Otherwise,
+   *         return a String
+   */
+  public String getUserContent(String key, UserContentStore.Scope scope, String workflowName,
+      String jobName, String taskName) {
+    return TaskUtil.getUserContent(_propertyStore, key, scope, workflowName, jobName, taskName);
+  }
+
+  /**
    * Throw Exception if children nodes will exceed limitation after adding newNodesCount children.
    * @param newConfigNodeCount
    */
@@ -999,4 +1014,4 @@ public class TaskDriver {
           "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS.");
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 68a9d4a..9ca0062 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -309,17 +309,17 @@ public class TaskUtil {
   }
 
   /**
-   * Get user defined workflow or job level key-value pair data
-   * @param manager a connection to Helix
-   * @param workflowJobResource the name of workflow
-   * @param key the key of key-value pair
+   * Get user-defined workflow/job scope key-value pair data. This method takes
+   * HelixPropertyStore<ZNRecord>.
+   * @param propertyStore
+   * @param workflowJobResource
+   * @param key
    * @return null if there is no such pair, otherwise return a String
    */
-  protected static String getWorkflowJobUserContent(HelixManager manager,
+  protected static String getWorkflowJobUserContent(HelixPropertyStore<ZNRecord> propertyStore,
       String workflowJobResource, String key) {
-    ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null,
-        AccessOption.PERSISTENT);
+    ZNRecord r = propertyStore.get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
+        workflowJobResource, USER_CONTENT_NODE), null, AccessOption.PERSISTENT);
     return r != null ? r.getSimpleField(key) : null;
   }
 
@@ -346,15 +346,15 @@ public class TaskUtil {
 
   /**
    * Get user defined task level key-value pair data
-   * @param manager a connection to Helix
+   * @param propertyStore
    * @param job the name of job
    * @param task the name of the task
    * @param key the key of key-value pair
    * @return null if there is no such pair, otherwise return a String
    */
-  protected static String getTaskUserContent(HelixManager manager, String job, String task,
-      String key) {
-    ZNRecord r = manager.getHelixPropertyStore().get(
+  protected static String getTaskUserContent(HelixPropertyStore<ZNRecord> propertyStore, String job,
+      String task, String key) {
+    ZNRecord r = propertyStore.get(
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null,
         AccessOption.PERSISTENT);
     return r != null ? (r.getMapField(task) != null ? r.getMapField(task).get(key) : null) : null;
@@ -386,6 +386,30 @@ public class TaskUtil {
   }
 
   /**
+   * Helper method for looking up UserContentStore content.
+   * @param propertyStore
+   * @param key
+   * @param scope
+   * @param workflowName
+   * @param jobName
+   * @param taskName
+   * @return value corresponding to the key
+   */
+  protected static String getUserContent(HelixPropertyStore propertyStore, String key,
+      UserContentStore.Scope scope, String workflowName, String jobName, String taskName) {
+    switch (scope) {
+      case WORKFLOW:
+        return TaskUtil.getWorkflowJobUserContent(propertyStore, workflowName, key);
+      case JOB:
+        return TaskUtil.getWorkflowJobUserContent(propertyStore, jobName, key);
+      case TASK:
+        return TaskUtil.getTaskUserContent(propertyStore, jobName, taskName, key);
+      default:
+        throw new HelixException("Invalid scope : " + scope.name());
+    }
+  }
+
+  /**
    * Get a workflow-qualified job name for a single-job workflow
    * @param singleJobWorkflow the name of the single-job workflow
    * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow

http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
index b188be7..ba80e88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
+++ b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
@@ -25,7 +25,6 @@ import org.apache.helix.HelixManager;
 /**
  * UserContentStore provides default implementation of user defined key-value pair store per task,
  * job and workflow level.
- *
  * TODO: This class should be merged to Task interface when Helix bump up to Java 8
  */
 public abstract class UserContentStore {
@@ -47,10 +46,10 @@ public abstract class UserContentStore {
     TASK
   }
 
-  private HelixManager _manager;
-  private String _workflowName;
-  private String _jobName;
-  private String _taskName;
+  protected HelixManager _manager;
+  protected String _workflowName;
+  protected String _jobName;
+  protected String _taskName;
 
   /**
    * Default initialization of user content store
@@ -67,44 +66,40 @@ public abstract class UserContentStore {
   }
 
   /**
-   * Default implementation for user defined put key-value pair
+   * Default implementation for user defined put key-value pair. Warning: this method is not
+   * thread-safe - we recommend creating a different key-value pair instead of modifying the value
+   * on the same key.
    * @param key The key of key-value pair
    * @param value The value of key-value pair
    * @param scope The scope defines which layer to store
    */
   public void putUserContent(String key, String value, Scope scope) {
     switch (scope) {
-    case WORKFLOW:
-      TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value);
-      break;
-    case JOB:
-      TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value);
-      break;
-    case TASK:
-      TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value);
-      break;
-    default:
-      throw new HelixException("Invalid scope : " + scope.name());
+      case WORKFLOW:
+        TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value);
+        break;
+      case JOB:
+        TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value);
+        break;
+      case TASK:
+        TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value);
+        break;
+      default:
+        throw new HelixException("Invalid scope : " + scope.name());
     }
   }
 
   /**
-   * Default implementation for user defined get key-value pair
+   * Default implementation for user defined get key-value pair. Warning: this method is not
+   * thread-safe - we recommend creating a different key-value pair instead of modifying the value
+   * on the same key.
    * @param key The key of key-value pair
    * @param scope The scope defines which layer that key-value pair stored
    * @return Null if key-value pair not found or this content store does not exists. Otherwise,
    *         return a String
    */
   public String getUserContent(String key, Scope scope) {
-    switch (scope) {
-    case WORKFLOW:
-      return TaskUtil.getWorkflowJobUserContent(_manager, _workflowName, key);
-    case JOB:
-      return TaskUtil.getWorkflowJobUserContent(_manager, _jobName, key);
-    case TASK:
-      return TaskUtil.getTaskUserContent(_manager, _jobName, _taskName, key);
-    default:
-      throw new HelixException("Invalid scope : " + scope.name());
-    }
+    return TaskUtil.getUserContent(_manager.getHelixPropertyStore(), key, scope, _workflowName,
+        _jobName, _taskName);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java b/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
new file mode 100644
index 0000000..392c278
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
@@ -0,0 +1,144 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestGetUserContentStore extends TaskTestBase {
+  private static final String JOB_COMMAND = "DummyCommand";
+  private Map<String, String> _jobCommandMap;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _participants = new MockParticipantManager[_numNodes];
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    // Setup cluster and instances
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < _numNodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < _numNodes; i++) {
+      final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+
+      // Set task callbacks
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+      TaskFactory shortTaskFactory = new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new WriteTask(context);
+        }
+      };
+      taskFactoryReg.put("WriteTask", shortTaskFactory);
+
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Start an admin connection
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    _jobCommandMap = new HashMap<>();
+  }
+
+  @Test
+  public void testGetUserContentStore() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+    WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName);
+    configBuilder.setAllowOverlapJobAssignment(true);
+    workflowBuilder.setWorkflowConfig(configBuilder.build());
+
+    List<String> jobsThatRan = new ArrayList<>();
+    // Create 5 jobs with 1 WriteTask each
+    for (int i = 0; i < 5; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, String>()));
+      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND)
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap);
+      workflowBuilder.addJob("JOB" + i, jobConfigBulider);
+      jobsThatRan.add(workflowName + "_JOB" + i);
+    }
+
+    // Start the workflow and wait until completion
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    // Aggregate key-value mappings in UserContentStore
+    int runCount = 0;
+    for (String jobName : jobsThatRan) {
+      String value = _driver.getUserContent(jobName, UserContentStore.Scope.WORKFLOW, workflowName,
+          jobName, null);
+      runCount += Integer.parseInt(value);
+    }
+    Assert.assertEquals(runCount, 5);
+  }
+
+  /**
+   * A mock task that writes to UserContentStore. MockTask extends UserContentStore.
+   */
+  private class WriteTask extends MockTask {
+
+    public WriteTask(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public TaskResult run() {
+      putUserContent(_jobName, Integer.toString(1), Scope.WORKFLOW);
+      return new TaskResult(TaskResult.Status.COMPLETED, "");
+    }
+  }
+}