You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/11/04 17:51:23 UTC

[helix] branch master updated: Fix ondemand rebalance flooding and log flooding caused by dangling jobs (#1508)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new ec214b2  Fix ondemand rebalance flooding and log flooding caused by dangling jobs (#1508)
ec214b2 is described below

commit ec214b2e809c6895382019243007e0167680502b
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Wed Nov 4 09:44:08 2020 -0800

    Fix ondemand rebalance flooding and log flooding caused by dangling jobs (#1508)
    
    This commit changes runtime dag refresh logic to eliminate ondemand rebalance
    flooding caused by dangling jobs. This commit also changes log level to get rid of
    log flooding caused by missing target resources.
---
 .../apache/helix/common/caches/TaskDataCache.java  | 33 +----------
 .../stages/ResourceComputationStage.java           |  3 +-
 .../TestWorkflowControllerDataProvider.java        | 66 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 32 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index f6f6a72..d400e78 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -116,36 +116,7 @@ public class TaskDataCache extends AbstractDataCache {
       }
     }
 
-    // The following 3 blocks is for finding a list of workflows whose JobDAGs have been changed
-    // because their RuntimeJobDags would need to be re-built
-    // newly added jobs
-    for (String jobName : newJobConfigs.keySet()) {
-      if (!_jobConfigMap.containsKey(jobName) && newJobConfigs.get(jobName).getWorkflow() != null) {
-        workflowsUpdated.add(newJobConfigs.get(jobName).getWorkflow());
-      }
-
-      // Only for JobQueues when a new job is enqueued, there exists a race condition where only
-      // JobConfig is updated and the RuntimeJobDag does not get updated because when the client
-      // (TaskDriver) submits, it creates JobConfig ZNode first and modifies its parent JobDag next.
-      // To ensure that they are both properly updated, check that workflow's DAG and existing
-      // JobConfigs are consistent for JobQueues
-      JobConfig jobConfig = newJobConfigs.get(jobName);
-      if (_workflowConfigMap.containsKey(jobConfig.getWorkflow())) {
-        WorkflowConfig workflowConfig = _workflowConfigMap.get(jobConfig.getWorkflow());
-        // Check that the job's parent workflow's DAG contains this job
-        if ((workflowConfig.isJobQueue() || !workflowConfig.isTerminable()) && !_runtimeJobDagMap
-            .get(workflowConfig.getWorkflowId()).getAllNodes().contains(jobName)) {
-          // Inconsistency between JobConfigs and DAGs found. Add the workflow to workflowsUpdated
-          // to rebuild the RuntimeJobDag
-          workflowsUpdated.add(jobConfig.getWorkflow());
-        }
-      }
-    }
-
-    // Removed jobs
-    // This block makes sure that the workflow config has been changed.
-    // This avoid the race condition where job config has been purged but job has not been deleted
-    // from JobDag yet
+    // If the workflow config has been updated, it's possible that the dag has been changed.
     for (String workflowName : _workflowConfigMap.keySet()) {
       if (_runtimeJobDagMap.containsKey(workflowName)) {
         if (_workflowConfigMap.get(workflowName).getRecord().getVersion() != _runtimeJobDagMap
@@ -189,7 +160,7 @@ public class TaskDataCache extends AbstractDataCache {
     for (String resourceName : childNames) {
       contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT));
     }
-    
+
     List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0, true);
 
     for (int i = 0; i < contexts.size(); i++) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 284479c..7c2cac6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -164,7 +164,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
       if (numPartitions == 0 && idealStates != null) {
         IdealState targetIs = idealStates.get(jobConfig.getTargetResource());
         if (targetIs == null) {
-          LOG.warn("Target resource " + jobConfig.getTargetResource() + " does not exist for job " + resourceName);
+          LOG.debug("Target resource " + jobConfig.getTargetResource() + " does not exist for job "
+              + resourceName);
         } else {
           numPartitions = targetIs.getPartitionSet().size();
         }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/dataproviders/TestWorkflowControllerDataProvider.java b/helix-core/src/test/java/org/apache/helix/controller/dataproviders/TestWorkflowControllerDataProvider.java
index 312a1b3..d8f9c1d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/dataproviders/TestWorkflowControllerDataProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/dataproviders/TestWorkflowControllerDataProvider.java
@@ -19,10 +19,20 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.RuntimeJobDag;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -70,4 +80,60 @@ public class TestWorkflowControllerDataProvider extends TaskTestBase {
     Assert.assertTrue(expectedValuesAchieved);
 
   }
+
+  @Test (dependsOnMethods = "testResourceConfigRefresh")
+  public void testRuntimeDagRefresh() throws Exception {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(jobQueueName);
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    builder.enqueueJob(WorkflowGenerator.JOB_NAME_1, jobBuilder);
+    String jobName1 = TaskUtil.getNamespacedJobName(jobQueueName, WorkflowGenerator.JOB_NAME_1);
+    _driver.start(builder.build());
+
+    WorkflowControllerDataProvider cache =
+        new WorkflowControllerDataProvider("CLUSTER_" + TestHelper.getTestClassName());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      cache.requireFullRefresh();
+      cache.refresh(_manager.getHelixDataAccessor());
+      return cache.getTaskDataCache().getJobConfig(jobName1) != null;
+    }, TestHelper.WAIT_DURATION));
+    RuntimeJobDag runtimeJobDag = cache.getTaskDataCache().getRuntimeJobDag(jobQueueName);
+    Assert.assertEquals(Collections.singleton(jobName1), runtimeJobDag.getAllNodes());
+
+    // Mimic job running
+    runtimeJobDag.getNextJob();
+
+    // Add job config without adding it to the dag
+    String danglingJobName = TaskUtil.getNamespacedJobName(jobQueueName, "DanglingJob");
+    JobConfig danglingJobConfig = new JobConfig(danglingJobName, jobBuilder.build());
+    PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+    _baseAccessor
+        .create(keyBuilder.resourceConfig(danglingJobName).getPath(), danglingJobConfig.getRecord(),
+            AccessOption.PERSISTENT);
+
+    // There shouldn't be a refresh to runtime dag. The dag should only contain one job and the job is inflight.
+    Assert.assertTrue(TestHelper.verify(() -> {
+      cache.requireFullRefresh();
+      cache.refresh(_manager.getHelixDataAccessor());
+      return cache.getTaskDataCache().getJobConfig(danglingJobName) != null;
+    }, TestHelper.WAIT_DURATION));
+    runtimeJobDag = cache.getTaskDataCache().getRuntimeJobDag(jobQueueName);
+    Assert.assertEquals(Collections.singleton(jobName1), runtimeJobDag.getAllNodes());
+    Assert.assertEquals(Collections.singleton(jobName1), runtimeJobDag.getInflightJobList());
+
+    _driver.enqueueJob(jobQueueName, WorkflowGenerator.JOB_NAME_2, jobBuilder);
+    String jobName2 = TaskUtil.getNamespacedJobName(jobQueueName, WorkflowGenerator.JOB_NAME_2);
+
+    // There should be a refresh to runtime dag.
+    Assert.assertTrue(TestHelper.verify(() -> {
+      cache.requireFullRefresh();
+      cache.refresh(_manager.getHelixDataAccessor());
+      return cache.getTaskDataCache().getJobConfig(jobName2) != null;
+    }, TestHelper.WAIT_DURATION));
+    runtimeJobDag = cache.getTaskDataCache().getRuntimeJobDag(jobQueueName);
+    Assert.assertEquals(new HashSet<>(Arrays.asList(jobName1, jobName2)),
+        runtimeJobDag.getAllNodes());
+    Assert.assertEquals(Collections.emptyList(), runtimeJobDag.getInflightJobList());
+  }
 }