You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:08 UTC

[12/33] helix git commit: Add static methods into TaskDriver for getting configuration/context for jobs and workflows.

Add static methods into TaskDriver for getting configuration/context for jobs and workflows.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/aeb6f3ec
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/aeb6f3ec
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/aeb6f3ec

Branch: refs/heads/helix-0.6.x
Commit: aeb6f3ec7ab973316fd3468e9cbb0052a6a4306e
Parents: d386aff
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Mar 21 13:46:01 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:53:34 2016 -0700

----------------------------------------------------------------------
 .../webapp/resources/JobQueueResource.java      | 19 +++++++------------
 .../helix/webapp/resources/JobResource.java     | 11 ++---------
 .../java/org/apache/helix/task/TaskDriver.java  | 16 ++++++++++++++++
 .../java/org/apache/helix/task/TaskUtil.java    | 20 ++++++++++----------
 .../integration/task/TestTaskRebalancer.java    |  2 --
 5 files changed, 35 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/aeb6f3ec/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
index 830e16b..954ae73 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -21,15 +21,10 @@ package org.apache.helix.webapp.resources;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.store.HelixPropertyStore;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskUtil;
@@ -91,14 +86,14 @@ public class JobQueueResource extends ServerResource {
         ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
-    // Get job queue config
-    HelixProperty jobQueueConfig = accessor.getProperty(keyBuilder.resourceConfig(jobQueueName));
+    TaskDriver taskDriver = new TaskDriver(zkClient, clusterName);
 
+    // Get job queue config
+    // TODO: fix this to use workflowConfig.
+    ResourceConfig jobQueueConfig = accessor.getProperty(keyBuilder.resourceConfig(jobQueueName));
+    
     // Get job queue context
-    String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
-    HelixPropertyStore<ZNRecord> propertyStore =
-        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(zkClient), path, null);
-    WorkflowContext ctx = TaskUtil.getWorkflowContext(propertyStore, jobQueueName);
+    WorkflowContext ctx = taskDriver.getWorkflowContext(jobQueueName);
 
     // Create the result
     ZNRecord hostedEntitiesRecord = new ZNRecord(jobQueueName);

http://git-wip-us.apache.org/repos/asf/helix/blob/aeb6f3ec/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
index cdcde35..d31c81b 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
@@ -25,10 +25,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.store.HelixPropertyStore;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskUtil;
@@ -119,14 +116,10 @@ public class JobResource extends ServerResource {
     // Get job queue config
     String namespacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, jobName);
     HelixProperty jobConfig = accessor.getProperty(keyBuilder.resourceConfig(namespacedJobName));
+    TaskDriver taskDriver = new TaskDriver(zkClient, clusterName);
 
     // Get job queue context
-    JobContext ctx = null;
-    String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
-    HelixPropertyStore<ZNRecord> propertyStore =
-        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(zkClient), path, null);
-
-    ctx = TaskUtil.getJobContext(propertyStore, namespacedJobName);
+    JobContext ctx = taskDriver.getJobContext(namespacedJobName);
 
     // Create the result
     ZNRecord hostedEntitiesRecord = new ZNRecord(namespacedJobName);

http://git-wip-us.apache.org/repos/asf/helix/blob/aeb6f3ec/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 99bcb62..c0d7852 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -733,6 +733,22 @@ public class TaskDriver {
     return TaskUtil.getJobContext(_propertyStore, job);
   }
 
+  public static JobContext getJobContext(HelixManager manager, String job) {
+    return TaskUtil.getJobContext(manager, job);
+  }
+
+  public static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
+    return TaskUtil.getWorkflowCfg(manager, workflow);
+  }
+
+  public static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
+    return TaskUtil.getWorkflowContext(manager, workflow);
+  }
+
+  public static JobConfig getJobConfig(HelixManager manager, String job) {
+    return TaskUtil.getJobCfg(manager, job);
+  }
+
   public void list(String resource) {
     WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_accessor, resource);
     if (wCfg == null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/aeb6f3ec/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 513c14e..8745a82 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -51,7 +51,7 @@ public class TaskUtil {
 
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.
-   * This method is internal API, please use TaskDriver.getJobConfig();
+   * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
    *
    * @param accessor    Accessor to access Helix configs
    * @param jobResource The name of the job resource
@@ -78,7 +78,7 @@ public class TaskUtil {
 
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.
-   * This method is internal API, please use TaskDriver.getJobConfig();
+   * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
    *
    * @param manager     HelixManager object used to connect to Helix.
    * @param jobResource The name of the job resource.
@@ -91,7 +91,7 @@ public class TaskUtil {
 
   /**
    * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
-   * This method is internal API, please use TaskDriver.getWorkflowConfig();
+   * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowConfig();
    *
    * @param accessor  Accessor to access Helix configs
    * @param workflow The name of the workflow.
@@ -112,7 +112,7 @@ public class TaskUtil {
 
   /**
    * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
-   * This method is internal API, please use TaskDriver.getWorkflowConfig();
+   * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowConfig();
    *
    * @param manager          Helix manager object used to connect to Helix.
    * @param workflow The name of the workflow resource.
@@ -143,7 +143,7 @@ public class TaskUtil {
    * @param jobResource   The name of the job
    * @return the {@link JobContext}, or null if none is available
    */
-  public static JobContext getJobContext(HelixPropertyStore<ZNRecord> propertyStore,
+  protected static JobContext getJobContext(HelixPropertyStore<ZNRecord> propertyStore,
       String jobResource) {
     ZNRecord r = propertyStore
         .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
@@ -179,13 +179,13 @@ public class TaskUtil {
 
   /**
    * Get the runtime context of a single workflow.
-   * This method is internal API, please use TaskDriver.getWorkflowContext();
+   * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext();
    *
    * @param propertyStore    Property store of the cluster
    * @param workflowResource The name of the workflow
    * @return the {@link WorkflowContext}, or null if none is available
    */
-  public static WorkflowContext getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
+  protected static WorkflowContext getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
       String workflowResource) {
     ZNRecord r = propertyStore.get(
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
@@ -195,7 +195,7 @@ public class TaskUtil {
 
   /**
    * Get the runtime context of a single workflow.
-   * This method is internal API, please use TaskDriver.getWorkflowContext();
+   * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext();
    *
    * @param manager          a connection to Helix
    * @param workflowResource the name of the workflow
@@ -212,7 +212,7 @@ public class TaskUtil {
    * @param workflowResource the name of the workflow
    * @param ctx              the up-to-date {@link WorkflowContext} for the workflow
    */
-  public static void setWorkflowContext(HelixManager manager, String workflowResource,
+  protected static void setWorkflowContext(HelixManager manager, String workflowResource,
       WorkflowContext ctx) {
     manager.getHelixPropertyStore().set(
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
@@ -298,7 +298,7 @@ public class TaskUtil {
    * @param accessor Helix data accessor
    * @param resource the name of the resource changed to triggering the execution
    */
-  public static void invokeRebalance(HelixDataAccessor accessor, String resource) {
+  protected static void invokeRebalance(HelixDataAccessor accessor, String resource) {
     // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
     LOG.info("invoke rebalance for " + resource);
     PropertyKey key = accessor.keyBuilder().idealStates(resource);

http://git-wip-us.apache.org/repos/asf/helix/blob/aeb6f3ec/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 3a5b179..74b10fd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -184,8 +184,6 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     // Wait for job to finish and expire
     TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
-    TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName());
-    Thread.sleep(expiry);
 
     // Ensure workflow config and context were cleaned up by now
     Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,