You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/07/01 18:26:35 UTC

[helix] branch master updated: Fix RuntimeJobDag initialization with old DAG (#1131)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a50d42  Fix RuntimeJobDag initialization with old DAG (#1131)
5a50d42 is described below

commit 5a50d4254c76b38e2a4a68cf61b2a452628f7210
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Jul 1 11:26:24 2020 -0700

    Fix RuntimeJobDag initialization with old DAG (#1131)
    
    Use Znode version to initialize RuntimeJobDag
    
    In this commit, the znode version has been used to create the
    RuntimeJobDag in order to avoid race condition between purge
    logic and cache refresh.
    This PR specifically stabilized TestJobQueueCleanUp.
    Also this PR allows the job to continue running even if its IdealState has been removed.
---
 .../apache/helix/common/caches/TaskDataCache.java  |  16 +++-
 .../java/org/apache/helix/task/RuntimeJobDag.java  |   9 +-
 .../task/TestJobQueueDeleteIdealState.java         | 104 +++++++++++++++++++++
 3 files changed, 123 insertions(+), 6 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 6d8100e..7d45465 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -112,7 +112,7 @@ public class TaskDataCache extends AbstractDataCache {
           WorkflowConfig workflowConfig = _workflowConfigMap.get(entry.getKey());
           _runtimeJobDagMap.put(entry.getKey(), new RuntimeJobDag(workflowConfig.getJobDag(),
               workflowConfig.isJobQueue() || !workflowConfig.isTerminable(),
-              workflowConfig.getParallelJobs()));
+              workflowConfig.getParallelJobs(), workflowConfig.getRecord().getVersion()));
         }
       } else if (entry.getValue().getRecord().getSimpleFields()
           .containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) {
@@ -147,9 +147,15 @@ public class TaskDataCache extends AbstractDataCache {
     }
 
     // Removed jobs
-    for (String jobName : _jobConfigMap.keySet()) {
-      if (!newJobConfigs.containsKey(jobName) && _jobConfigMap.get(jobName).getWorkflow() != null) {
-        workflowsUpdated.add(_jobConfigMap.get(jobName).getWorkflow());
+    // This block makes sure that the workflow config has been changed.
+    // This avoid the race condition where job config has been purged but job has not been deleted
+    // from JobDag yet
+    for (String workflowName : _workflowConfigMap.keySet()) {
+      if (_runtimeJobDagMap.containsKey(workflowName)) {
+        if (_workflowConfigMap.get(workflowName).getRecord().getVersion() != _runtimeJobDagMap
+            .get(workflowName).getVersion()) {
+          workflowsUpdated.add(workflowName);
+        }
       }
     }
 
@@ -159,7 +165,7 @@ public class TaskDataCache extends AbstractDataCache {
         WorkflowConfig workflowConfig = _workflowConfigMap.get(changedWorkflow);
         _runtimeJobDagMap.put(changedWorkflow, new RuntimeJobDag(workflowConfig.getJobDag(),
             workflowConfig.isJobQueue() || !workflowConfig.isTerminable(),
-            workflowConfig.getParallelJobs()));
+            workflowConfig.getParallelJobs(), workflowConfig.getRecord().getVersion()));
       }
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
index b5703f6..9115738 100644
--- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
@@ -49,6 +49,7 @@ public class RuntimeJobDag extends JobDag {
   private boolean _isJobQueue;
   private int _numParallelJobs;
   private String _lastJob;
+  private int _version; // The version of the workflow config znode that is used to construct this RuntimeJobDag
 
   /**
    * Constructor for Job DAG.
@@ -58,17 +59,23 @@ public class RuntimeJobDag extends JobDag {
     _readyJobList = new ArrayDeque<>();
     _inflightJobList = new HashSet<>();
     _hasDagChanged = true;
+    _version = 0;
   }
 
-  public RuntimeJobDag(JobDag jobDag, boolean isJobQueue, int numParallelJobs) {
+  public RuntimeJobDag(JobDag jobDag, boolean isJobQueue, int numParallelJobs, int version) {
     this._childrenToParents = jobDag.getChildrenToParents();
     this._parentsToChildren = jobDag.getParentsToChildren();
     this._allNodes = jobDag.getAllNodes();
     this._isJobQueue = isJobQueue;
     this._numParallelJobs = numParallelJobs <= 0 ? DEFAULT_NUM_PARALLEL_JOBS : numParallelJobs;
+    this._version = version;
     generateJobList();
   }
 
+  public int getVersion() {
+    return this._version;
+  }
+
   @Override
   public void addParentToChild(String parent, String child) {
     _hasDagChanged = true;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java
new file mode 100644
index 0000000..1e30b55
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java
@@ -0,0 +1,104 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test to make sure that a job which is running will be able to continue its progress even when its
+ * IdealState gets deleted.
+ */
+public class TestJobQueueDeleteIdealState extends TaskTestBase {
+  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 3;
+    super.beforeClass();
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+  }
+
+  @Test
+  public void testJobQueueDeleteIdealState() throws Exception {
+    String jobQueueName = TestHelper.getTestMethodName();
+
+    _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+
+    JobConfig.Builder jobBuilder0 =
+        new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND).setExpiry(5000L)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("JOB0", jobBuilder0);
+    jobQueue.enqueueJob("JOB1", jobBuilder0);
+
+    WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(jobQueue.getWorkflowConfig());
+    cfgBuilder.setJobPurgeInterval(1000L);
+    jobQueue.setWorkflowConfig(cfgBuilder.build());
+
+    _driver.start(jobQueue.build());
+
+    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "JOB0"),
+        TaskState.COMPLETED);
+
+    // Wait until JOB1 goes to IN_PROGRESS
+    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "JOB1"),
+        TaskState.IN_PROGRESS);
+
+    // Remove IdealState of JOB1
+    PropertyKey isKey =
+        _accessor.keyBuilder().idealStates(TaskUtil.getNamespacedJobName(jobQueueName, "JOB1"));
+    if (_accessor.getPropertyStat(isKey) != null) {
+      _accessor.removeProperty(isKey);
+    }
+
+    // Make sure IdealState has been successfully deleted
+    Assert.assertNull(_accessor.getPropertyStat(isKey));
+
+    // JOB1 should reach completed state even without IS
+    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "JOB1"),
+        TaskState.COMPLETED);
+  }
+}