You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/27 23:17:09 UTC
helix git commit: [HELIX-685] Set job state to NOT_STARTED at job
creation in WorkflowRebalancer
Repository: helix
Updated Branches:
refs/heads/master f4942c492 -> 7c8a271c1
[HELIX-685] Set job state to NOT_STARTED at job creation in WorkflowRebalancer
Previously, when jobs are created, their states were not set (null). Jobs are now in NOT_STARTED at workflow context creation for more transparency on the client side.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7c8a271c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7c8a271c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7c8a271c
Branch: refs/heads/master
Commit: 7c8a271c1c2057fb799312cc9aa6399f8a091b5e
Parents: f4942c4
Author: narendly <na...@gmail.com>
Authored: Mon Mar 26 14:29:14 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Tue Mar 27 16:16:35 2018 -0700
----------------------------------------------------------------------
.../apache/helix/task/WorkflowRebalancer.java | 9 +-
.../helix/task/TestJobStateOnCreation.java | 89 ++++++++++++++++++++
2 files changed, 95 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7c8a271c/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index e3214f8..1590039 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -40,7 +40,6 @@ import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.model.builder.IdealStateBuilder;
import org.slf4j.Logger;
@@ -73,6 +72,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
workflowCtx.setStartTime(System.currentTimeMillis());
workflowCtx.setName(workflow);
+ // Initialize all job states belonging to this workflow context to NOT_STARTED
+ for (String jobState : workflowCtx.getJobStates().keySet()) {
+ workflowCtx.setJobState(jobState, TaskState.NOT_STARTED);
+ }
LOG.debug("Workflow context is created for " + workflow);
}
@@ -180,7 +183,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
LOG.debug(String.format("Workflow %s already have enough job in progress, "
- + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
+ + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
break;
}
@@ -546,4 +549,4 @@ public class WorkflowRebalancer extends TaskRebalancer {
// Nothing to do here with workflow resource.
return currentIdealState;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/7c8a271c/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java b/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
new file mode 100644
index 0000000..4b5b309
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
@@ -0,0 +1,89 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.Map;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestJobStateOnCreation extends TaskSynchronizedTestBase {
+
+ private static final String WORKFLOW_NAME = "testWorkflow";
+
+ private ClusterDataCache _cache;
+ private IdealState _idealState;
+ private Resource _resource;
+ private CurrentStateOutput _currStateOutput;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _cache = new ClusterDataCache();
+ _idealState = new IdealState(WORKFLOW_NAME);
+ _resource = new Resource(WORKFLOW_NAME);
+ _currStateOutput = new CurrentStateOutput();
+ _participants = new MockParticipantManager[_numNodes];
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursively(namespace);
+ }
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ createManagers();
+ }
+
+ @Test
+ public void testJobStateOnCreation() {
+ Workflow.Builder builder = new Workflow.Builder(WORKFLOW_NAME);
+ JobConfig.Builder jobConfigBuilder = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WORKFLOW_NAME).setTargetPartitionStates(Sets.newHashSet("SLAVE","MASTER"))
+ .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+ String jobName = "job";
+ builder = builder.addJob(jobName, jobConfigBuilder);
+ Workflow workflow = builder.build();
+ WorkflowConfig workflowConfig = workflow.getWorkflowConfig();
+ JobConfig jobConfig = jobConfigBuilder.build();
+ workflowConfig.getRecord().merge(jobConfig.getRecord());
+
+ _cache.getJobConfigMap().put(WORKFLOW_NAME + "_" + jobName, jobConfig);
+ _cache.getWorkflowConfigMap().put(WORKFLOW_NAME, workflowConfig);
+
+ WorkflowRebalancer workflowRebalancer = new WorkflowRebalancer();
+ workflowRebalancer.init(_manager);
+ ResourceAssignment resourceAssignment = workflowRebalancer
+ .computeBestPossiblePartitionState(_cache, _idealState, _resource, _currStateOutput);
+
+ WorkflowContext workflowContext = _cache.getWorkflowContext(WORKFLOW_NAME);
+ Map<String, TaskState> jobStates = workflowContext.getJobStates();
+ for (String job : jobStates.keySet()) {
+ Assert.assertEquals(jobStates.get(job), TaskState.NOT_STARTED);
+ }
+ }
+}
\ No newline at end of file