You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/12/10 00:36:55 UTC

[helix] branch master updated: Fix redundant workflow context updates for finised workflows (#1583)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8be593b  Fix redundant workflow context updates for finised workflows (#1583)
8be593b is described below

commit 8be593b66ffbd6aa93b7639890c6b870dc81d6a0
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Dec 9 16:36:47 2020 -0800

    Fix redundant workflow context updates for finised workflows (#1583)
    
    If the workflow has been finished before, there is no need to update the worklfow context.
    In this commit, this behavior has been fixed and optimized.
---
 .../org/apache/helix/task/WorkflowDispatcher.java  |  1 -
 .../task/TestContextRedundantUpdates.java          | 85 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index b8f0e67..4455822 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -217,7 +217,6 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     } else {
       LOG.debug("Workflow {} is not ready to be scheduled.", workflow);
     }
-    _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
   }
 
   public WorkflowContext getOrInitializeWorkflowContext(String workflowName, TaskDataCache cache) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java
new file mode 100644
index 0000000..9e6c0fc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java
@@ -0,0 +1,85 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This test checks of the workflow and job context get updates only if the update is necessary
+ */
+public class TestContextRedundantUpdates extends TaskTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 1;
+    _numPartitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testFinishWorkflowContextNoUpdate() throws Exception {
+    // Create a workflow with short running job
+    String workflowName1 = TestHelper.getTestMethodName() + "_1";
+    String jobName = "JOB0";
+    JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setWorkflow(workflowName1).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+        .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+        TaskState.COMPLETED);
+    _driver.pollForWorkflowState(workflowName1, TaskState.COMPLETED);
+
+    int initialWorkflowContextVersion = _manager.getHelixDataAccessor()
+        .getProperty(
+            _manager.getHelixDataAccessor().keyBuilder().workflowContextZNode(workflowName1))
+        .getRecord().getVersion();
+
+    // Create another workflow with short running job
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setWorkflow(workflowName2).setNumberOfTasks(10).setNumConcurrentTasksPerInstance(100)
+        .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "5000"));
+    Workflow.Builder workflowBuilder2 =
+        new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder2);
+    // Start new workflow and make sure it gets completed. This would help us to make sure pipeline
+    // has been run several times
+    _driver.start(workflowBuilder2.build());
+    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+    int finalWorkflowContextVersion = _manager.getHelixDataAccessor()
+        .getProperty(
+            _manager.getHelixDataAccessor().keyBuilder().workflowContextZNode(workflowName1))
+        .getRecord().getVersion();
+
+    Assert.assertEquals(initialWorkflowContextVersion, finalWorkflowContextVersion);
+  }
+}