You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:02 UTC
[06/33] helix git commit: TaskUtil.getWorkflowCfg throws NPE if
workflow doesn't exist.
TaskUtil.getWorkflowCfg throws NPE if workflow doesn't exist.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6d42db46
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6d42db46
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6d42db46
Branch: refs/heads/helix-0.6.x
Commit: 6d42db462bc65ed9f94f22ca6e2de83cb703ea87
Parents: d213c1a
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 23 10:16:03 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:43:54 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 15 +++----
.../java/org/apache/helix/task/TaskUtil.java | 41 ++++++--------------
.../integration/task/TestRecurringJobQueue.java | 16 ++++++++
3 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/6d42db46/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index ce7bbf0..b55d9d0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -222,7 +222,8 @@ public class TaskDriver {
*
* Example:
*
- * WorkflowConfig currentWorkflowConfig = TaskUtil.getWorkflowCfg(_manager, workflow);
+ * _driver = new TaskDriver ...
+ * WorkflowConfig currentWorkflowConfig = _driver.getWorkflowCfg(_manager, workflow);
* WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig);
* // make needed changes to the config here
@@ -236,7 +237,7 @@ public class TaskDriver {
*/
public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
WorkflowConfig currentConfig =
- TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, workflow);
+ TaskUtil.getWorkflowCfg(_accessor, workflow);
if (currentConfig == null) {
throw new HelixException("Workflow " + workflow + " does not exist!");
}
@@ -270,7 +271,7 @@ public class TaskDriver {
// TODO: need to make sure the queue is stopped or completed before flush the queue.
public void flushQueue(String queueName) {
WorkflowConfig config =
- TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
+ TaskUtil.getWorkflowCfg(_accessor, queueName);
if (config == null) {
throw new IllegalArgumentException("Queue does not exist!");
}
@@ -339,7 +340,7 @@ public class TaskDriver {
*/
public void deleteJob(final String queueName, final String jobName) {
WorkflowConfig workflowCfg =
- TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
+ TaskUtil.getWorkflowCfg(_accessor, queueName);
if (workflowCfg == null) {
throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
@@ -384,7 +385,7 @@ public class TaskDriver {
*/
private void deleteJobFromScheduledQueue(final String queueName, final String jobName) {
WorkflowConfig workflowCfg =
- TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
+ TaskUtil.getWorkflowCfg(_accessor, queueName);
if (workflowCfg == null) {
throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
@@ -696,7 +697,7 @@ public class TaskDriver {
}
public WorkflowConfig getWorkflowConfig(String workflow) {
- return TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, workflow);
+ return TaskUtil.getWorkflowCfg(_accessor, workflow);
}
public WorkflowContext getWorkflowContext(String workflow) {
@@ -712,7 +713,7 @@ public class TaskDriver {
}
public void list(String resource) {
- WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, resource);
+ WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_accessor, resource);
if (wCfg == null) {
LOG.error("Workflow " + resource + " does not exist!");
return;
http://git-wip-us.apache.org/repos/asf/helix/blob/6d42db46/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 524b889..ca274d0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -30,7 +30,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
-import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
@@ -40,7 +39,6 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.log4j.Logger;
@@ -98,21 +96,19 @@ public class TaskUtil {
/**
* Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
*
- * @param cfgAccessor Config accessor to access Helix configs
- * @param accessor Accessor to access Helix configs
- * @param clusterName Cluster name
- * @param workflowResource The name of the workflow resource.
+ * @param accessor Accessor to access Helix configs
+ * @param workflow The name of the workflow.
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
* workflow, null otherwise.
*/
- public static WorkflowConfig getWorkflowCfg(ConfigAccessor cfgAccessor,
- HelixDataAccessor accessor, String clusterName, String workflowResource) {
- Map<String, String> workflowCfg =
- getResourceConfigMap(cfgAccessor, accessor, clusterName, workflowResource);
+ public static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) {
+ HelixProperty workflowCfg = getResourceConfig(accessor, workflow);
if (workflowCfg == null) {
return null;
}
- WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
+
+ WorkflowConfig.Builder b =
+ WorkflowConfig.Builder.fromMap(workflowCfg.getRecord().getSimpleFields());
return b.build();
}
@@ -121,13 +117,12 @@ public class TaskUtil {
* Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
*
* @param manager Helix manager object used to connect to Helix.
- * @param workflowResource The name of the workflow resource.
+ * @param workflow The name of the workflow resource.
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
* workflow, null otherwise.
*/
- public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
- return getWorkflowCfg(manager.getConfigAccessor(), manager.getHelixDataAccessor(),
- manager.getClusterName(), workflowResource);
+ public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) {
+ return getWorkflowCfg(manager.getHelixDataAccessor(), workflow);
}
/**
@@ -452,18 +447,6 @@ public class TaskUtil {
return workflowBuilder.build();
}
- private static Map<String, String> getResourceConfigMap(ConfigAccessor cfgAccessor,
- HelixDataAccessor accessor, String clusterName, String resource) {
- HelixConfigScope scope = getResourceConfigScope(clusterName, resource);
-
- List<String> cfgKeys = cfgAccessor.getKeys(scope);
- if (cfgKeys == null || cfgKeys.isEmpty()) {
- return null;
- }
-
- return getResourceConfig(accessor, resource).getRecord().getSimpleFields();
- }
-
private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.resourceConfig(resource));
@@ -522,7 +505,7 @@ public class TaskUtil {
return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
}
- public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String resource) {
- return accessor.keyBuilder().resourceConfig(resource);
+ public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String workflow) {
+ return accessor.keyBuilder().resourceConfig(workflow);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/6d42db46/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index cb44f0e..d83c5eb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -47,6 +47,8 @@ import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -367,6 +369,20 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
String.format("%s_%s", scheduledQueue, jobNames.get(JOB_COUNTS - 1)));
}
+ @Test
+ public void testGetNoExistWorkflowConfig() {
+ String randomName = "randomJob";
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(randomName);
+ Assert.assertNull(workflowConfig);
+ JobConfig jobConfig = _driver.getJobConfig(randomName);
+ Assert.assertNull(jobConfig);
+ WorkflowContext workflowContext = _driver.getWorkflowContext(randomName);
+ Assert.assertNull(workflowContext);
+ JobContext jobContext = _driver.getJobContext(randomName);
+ Assert.assertNull(jobContext);
+
+ }
+
private void verifyJobDeleted(String queueName, String jobName) throws Exception {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();