You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/27 23:17:09 UTC

helix git commit: [HELIX-685] Set job state to NOT_STARTED at job creation in WorkflowRebalancer

Repository: helix
Updated Branches:
  refs/heads/master f4942c492 -> 7c8a271c1


[HELIX-685] Set job state to NOT_STARTED at job creation in WorkflowRebalancer

Previously, when jobs are created, their states were not set (null). Jobs are now in NOT_STARTED at workflow context creation for more transparency on the client side.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7c8a271c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7c8a271c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7c8a271c

Branch: refs/heads/master
Commit: 7c8a271c1c2057fb799312cc9aa6399f8a091b5e
Parents: f4942c4
Author: narendly <na...@gmail.com>
Authored: Mon Mar 26 14:29:14 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Tue Mar 27 16:16:35 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/task/WorkflowRebalancer.java   |  9 +-
 .../helix/task/TestJobStateOnCreation.java      | 89 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7c8a271c/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index e3214f8..1590039 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -40,7 +40,6 @@ import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.model.builder.IdealStateBuilder;
 import org.slf4j.Logger;
@@ -73,6 +72,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
       workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
       workflowCtx.setStartTime(System.currentTimeMillis());
       workflowCtx.setName(workflow);
+      // Initialize all job states belonging to this workflow context to NOT_STARTED
+      for (String jobState : workflowCtx.getJobStates().keySet()) {
+        workflowCtx.setJobState(jobState, TaskState.NOT_STARTED);
+      }
       LOG.debug("Workflow context is created for " + workflow);
     }
 
@@ -180,7 +183,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
       if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
         LOG.debug(String.format("Workflow %s already have enough job in progress, "
-                + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
+            + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
         break;
       }
 
@@ -546,4 +549,4 @@ public class WorkflowRebalancer extends TaskRebalancer {
     // Nothing to do here with workflow resource.
     return currentIdealState;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/7c8a271c/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java b/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
new file mode 100644
index 0000000..4b5b309
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
@@ -0,0 +1,89 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.Map;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestJobStateOnCreation extends TaskSynchronizedTestBase {
+
+  private static final String WORKFLOW_NAME = "testWorkflow";
+
+  private ClusterDataCache _cache;
+  private IdealState _idealState;
+  private Resource _resource;
+  private CurrentStateOutput _currStateOutput;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _cache = new ClusterDataCache();
+    _idealState = new IdealState(WORKFLOW_NAME);
+    _resource = new Resource(WORKFLOW_NAME);
+    _currStateOutput = new CurrentStateOutput();
+    _participants =  new MockParticipantManager[_numNodes];
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    createManagers();
+  }
+
+  @Test
+  public void testJobStateOnCreation() {
+    Workflow.Builder builder = new Workflow.Builder(WORKFLOW_NAME);
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WORKFLOW_NAME).setTargetPartitionStates(Sets.newHashSet("SLAVE","MASTER"))
+        .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+    String jobName = "job";
+    builder = builder.addJob(jobName, jobConfigBuilder);
+    Workflow workflow = builder.build();
+    WorkflowConfig workflowConfig = workflow.getWorkflowConfig();
+    JobConfig jobConfig = jobConfigBuilder.build();
+    workflowConfig.getRecord().merge(jobConfig.getRecord());
+
+    _cache.getJobConfigMap().put(WORKFLOW_NAME + "_" + jobName, jobConfig);
+    _cache.getWorkflowConfigMap().put(WORKFLOW_NAME, workflowConfig);
+
+    WorkflowRebalancer workflowRebalancer = new WorkflowRebalancer();
+    workflowRebalancer.init(_manager);
+    ResourceAssignment resourceAssignment = workflowRebalancer
+        .computeBestPossiblePartitionState(_cache, _idealState, _resource, _currStateOutput);
+
+    WorkflowContext workflowContext = _cache.getWorkflowContext(WORKFLOW_NAME);
+    Map<String, TaskState> jobStates = workflowContext.getJobStates();
+    for (String job : jobStates.keySet()) {
+      Assert.assertEquals(jobStates.get(job), TaskState.NOT_STARTED);
+    }
+  }
+}
\ No newline at end of file