You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:02 UTC

[06/33] helix git commit: TaskUtil.getWorkflowCfg throws NPE if workflow doesn't exist.

TaskUtil.getWorkflowCfg throws NPE if workflow doesn't exist.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6d42db46
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6d42db46
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6d42db46

Branch: refs/heads/helix-0.6.x
Commit: 6d42db462bc65ed9f94f22ca6e2de83cb703ea87
Parents: d213c1a
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 23 10:16:03 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:43:54 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 15 +++----
 .../java/org/apache/helix/task/TaskUtil.java    | 41 ++++++--------------
 .../integration/task/TestRecurringJobQueue.java | 16 ++++++++
 3 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/6d42db46/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index ce7bbf0..b55d9d0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -222,7 +222,8 @@ public class TaskDriver {
    *
    * Example:
    *
-   * WorkflowConfig currentWorkflowConfig = TaskUtil.getWorkflowCfg(_manager, workflow);
+   * _driver = new TaskDriver ...
+   * WorkflowConfig currentWorkflowConfig = _driver.getWorkflowCfg(_manager, workflow);
    * WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig);
 
    * // make needed changes to the config here
@@ -236,7 +237,7 @@ public class TaskDriver {
    */
   public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
     WorkflowConfig currentConfig =
-        TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, workflow);
+        TaskUtil.getWorkflowCfg(_accessor, workflow);
     if (currentConfig == null) {
       throw new HelixException("Workflow " + workflow + " does not exist!");
     }
@@ -270,7 +271,7 @@ public class TaskDriver {
   // TODO: need to make sure the queue is stopped or completed before flush the queue.
   public void flushQueue(String queueName) {
     WorkflowConfig config =
-        TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
+        TaskUtil.getWorkflowCfg(_accessor, queueName);
     if (config == null) {
       throw new IllegalArgumentException("Queue does not exist!");
     }
@@ -339,7 +340,7 @@ public class TaskDriver {
    */
   public void deleteJob(final String queueName, final String jobName) {
     WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
+        TaskUtil.getWorkflowCfg(_accessor, queueName);
 
     if (workflowCfg == null) {
       throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
@@ -384,7 +385,7 @@ public class TaskDriver {
    */
   private void deleteJobFromScheduledQueue(final String queueName, final String jobName) {
     WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
+        TaskUtil.getWorkflowCfg(_accessor, queueName);
 
     if (workflowCfg == null) {
       throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
@@ -696,7 +697,7 @@ public class TaskDriver {
   }
 
   public WorkflowConfig getWorkflowConfig(String workflow) {
-    return TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, workflow);
+    return TaskUtil.getWorkflowCfg(_accessor, workflow);
   }
 
   public WorkflowContext getWorkflowContext(String workflow) {
@@ -712,7 +713,7 @@ public class TaskDriver {
   }
 
   public void list(String resource) {
-    WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, resource);
+    WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_accessor, resource);
     if (wCfg == null) {
       LOG.error("Workflow " + resource + " does not exist!");
       return;

http://git-wip-us.apache.org/repos/asf/helix/blob/6d42db46/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 524b889..ca274d0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.AccessOption;
-import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -40,7 +39,6 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.log4j.Logger;
@@ -98,21 +96,19 @@ public class TaskUtil {
   /**
    * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
    *
-   * @param cfgAccessor      Config accessor to access Helix configs
-   * @param accessor         Accessor to access Helix configs
-   * @param clusterName      Cluster name
-   * @param workflowResource The name of the workflow resource.
+   * @param accessor  Accessor to access Helix configs
+   * @param workflow The name of the workflow.
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
    * workflow, null otherwise.
    */
-  public static WorkflowConfig getWorkflowCfg(ConfigAccessor cfgAccessor,
-      HelixDataAccessor accessor, String clusterName, String workflowResource) {
-    Map<String, String> workflowCfg =
-        getResourceConfigMap(cfgAccessor, accessor, clusterName, workflowResource);
+  public static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) {
+    HelixProperty workflowCfg = getResourceConfig(accessor, workflow);
     if (workflowCfg == null) {
       return null;
     }
-    WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
+
+    WorkflowConfig.Builder b =
+        WorkflowConfig.Builder.fromMap(workflowCfg.getRecord().getSimpleFields());
 
     return b.build();
   }
@@ -121,13 +117,12 @@ public class TaskUtil {
    * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
    *
    * @param manager          Helix manager object used to connect to Helix.
-   * @param workflowResource The name of the workflow resource.
+   * @param workflow The name of the workflow resource.
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
    * workflow, null otherwise.
    */
-  public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
-    return getWorkflowCfg(manager.getConfigAccessor(), manager.getHelixDataAccessor(),
-        manager.getClusterName(), workflowResource);
+  public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) {
+    return getWorkflowCfg(manager.getHelixDataAccessor(), workflow);
   }
 
   /**
@@ -452,18 +447,6 @@ public class TaskUtil {
     return workflowBuilder.build();
   }
 
-  private static Map<String, String> getResourceConfigMap(ConfigAccessor cfgAccessor,
-      HelixDataAccessor accessor, String clusterName, String resource) {
-    HelixConfigScope scope = getResourceConfigScope(clusterName, resource);
-
-    List<String> cfgKeys = cfgAccessor.getKeys(scope);
-    if (cfgKeys == null || cfgKeys.isEmpty()) {
-      return null;
-    }
-
-    return getResourceConfig(accessor, resource).getRecord().getSimpleFields();
-  }
-
   private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.resourceConfig(resource));
@@ -522,7 +505,7 @@ public class TaskUtil {
     return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
   }
 
-  public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().resourceConfig(resource);
+  public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String workflow) {
+    return accessor.keyBuilder().resourceConfig(workflow);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/6d42db46/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index cb44f0e..d83c5eb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -47,6 +47,8 @@ import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -367,6 +369,20 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
         String.format("%s_%s", scheduledQueue, jobNames.get(JOB_COUNTS - 1)));
   }
 
+  @Test
+  public void testGetNoExistWorkflowConfig() {
+    String randomName = "randomJob";
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(randomName);
+    Assert.assertNull(workflowConfig);
+    JobConfig jobConfig = _driver.getJobConfig(randomName);
+    Assert.assertNull(jobConfig);
+    WorkflowContext workflowContext = _driver.getWorkflowContext(randomName);
+    Assert.assertNull(workflowContext);
+    JobContext jobContext = _driver.getJobContext(randomName);
+    Assert.assertNull(jobContext);
+
+  }
+
   private void verifyJobDeleted(String queueName, String jobName) throws Exception {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();