You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2021/01/13 19:39:04 UTC

[helix] branch master updated: Eliminate redundant job context writes (#1611)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new b688910  Eliminate redundant job context writes (#1611)
b688910 is described below

commit b6889108bd6f3b744242b3924074bf40bc01efbd
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Jan 13 11:38:56 2021 -0800

    Eliminate redundant job context writes (#1611)
    
    In this commit, the job context will only be written to ZK if
    it has been modified. Otherwise, there is no need to update
    the context information in the ZK.
---
 .../apache/helix/common/caches/TaskDataCache.java  |   4 +-
 .../java/org/apache/helix/task/JobContext.java     | 103 ++++++++++++++++-----
 .../task/TestContextRedundantUpdates.java          |  94 +++++++++++++++++++
 3 files changed, 175 insertions(+), 26 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 71cefcd..6c5c136 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -252,7 +252,9 @@ public class TaskDataCache extends AbstractDataCache {
    * Update context of the Job
    */
   public void updateJobContext(String resourceName, JobContext jobContext) {
-    updateContext(resourceName, jobContext.getRecord());
+    if (!_contextMap.containsKey(resourceName) || jobContext.isJobContextModified()) {
+      updateContext(resourceName, jobContext.getRecord());
+    }
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 51e062b..83ec551 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -50,12 +50,20 @@ public class JobContext extends HelixProperty {
     EXECUTION_START_TIME, // Time at which the first task of this job got scheduled
   }
 
+  // Note: This field needs to be set if any of the job context fields have been changed.
+  // Otherwise, the context will not be written to ZK by the controller.
+  private boolean isModified;
+
   public JobContext(ZNRecord record) {
     super(record);
+    isModified = false;
   }
 
   public void setStartTime(long t) {
-    _record.setSimpleField(ContextProperties.START_TIME.toString(), String.valueOf(t));
+    if (getStartTime() != t) {
+      _record.setSimpleField(ContextProperties.START_TIME.toString(), String.valueOf(t));
+      markJobContextAsModified();
+    }
   }
 
   public long getStartTime() {
@@ -67,7 +75,10 @@ public class JobContext extends HelixProperty {
   }
 
   public void setFinishTime(long t) {
-    _record.setSimpleField(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+    if (getFinishTime() != t) {
+      _record.setSimpleField(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+      markJobContextAsModified();
+    }
   }
 
   public long getFinishTime() {
@@ -79,8 +90,9 @@ public class JobContext extends HelixProperty {
   }
 
   public void setInfo(String info) {
-    if (info != null) {
+    if (info != null && !info.equals(getInfo())) {
       _record.setSimpleField(ContextProperties.INFO.toString(), info);
+      markJobContextAsModified();
     }
   }
 
@@ -89,8 +101,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionState(int p, TaskPartitionState s) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.STATE.toString(), s.name());
+    if (s != null && !s.equals(getPartitionState(p))) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.STATE.toString(), s.name());
+      markJobContextAsModified();
+    }
   }
 
   public TaskPartitionState getPartitionState(int p) {
@@ -107,8 +122,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionNumAttempts(int p, int n) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
+    if (getPartitionNumAttempts(p) != n) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
+      markJobContextAsModified();
+    }
   }
 
   public int incrementNumAttempts(int pId) {
@@ -134,8 +152,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionStartTime(int p, long t) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.START_TIME.toString(), String.valueOf(t));
+    if (getPartitionStartTime(p) != t) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.START_TIME.toString(), String.valueOf(t));
+      markJobContextAsModified();
+    }
   }
 
   public long getPartitionStartTime(int p) {
@@ -151,8 +172,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionFinishTime(int p, long t) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+    if (getPartitionFinishTime(p) != t) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+      markJobContextAsModified();
+    }
   }
 
   public long getPartitionFinishTime(int p) {
@@ -168,8 +192,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionTarget(int p, String targetPName) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.TARGET.toString(), targetPName);
+    if (targetPName != null && !targetPName.equals(getTargetForPartition(p))) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.TARGET.toString(), targetPName);
+      markJobContextAsModified();
+    }
   }
 
   public String getTargetForPartition(int p) {
@@ -178,8 +205,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setPartitionInfo(int p, String info) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.INFO.toString(), info);
+    if (info != null && !info.equals(getPartitionInfo(p))) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.INFO.toString(), info);
+      markJobContextAsModified();
+    }
   }
 
   public String getPartitionInfo(int p) {
@@ -216,8 +246,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setTaskIdForPartition(int p, String taskId) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.TASK_ID.toString(), taskId);
+    if (taskId != null && !taskId.equals(getTaskIdForPartition(p))) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.TASK_ID.toString(), taskId);
+      markJobContextAsModified();
+    }
   }
 
   public String getTaskIdForPartition(int p) {
@@ -238,8 +271,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setAssignedParticipant(int p, String participantName) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
+    if (participantName != null && !participantName.equals(getAssignedParticipant(p))) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
+      markJobContextAsModified();
+    }
   }
 
   public String getAssignedParticipant(int p) {
@@ -248,8 +284,11 @@ public class JobContext extends HelixProperty {
   }
 
   public void setNextRetryTime(int p, long t) {
-    Map<String, String> map = getMapField(p, true);
-    map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t));
+    if (getNextRetryTime(p) != t) {
+      Map<String, String> map = getMapField(p, true);
+      map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t));
+      markJobContextAsModified();
+    }
   }
 
   public long getNextRetryTime(int p) {
@@ -265,7 +304,10 @@ public class JobContext extends HelixProperty {
   }
 
   public void setName(String name) {
-    _record.setSimpleField(ContextProperties.NAME.name(), name);
+    if (!name.equals(getName())) {
+      _record.setSimpleField(ContextProperties.NAME.name(), name);
+      markJobContextAsModified();
+    }
   }
 
   public String getName() {
@@ -281,9 +323,9 @@ public class JobContext extends HelixProperty {
    * @param t
    */
   public void setExecutionStartTime(long t) {
-    String tStr = _record.getSimpleField(ContextProperties.EXECUTION_START_TIME.toString());
-    if (tStr == null) {
+    if (getExecutionStartTime() == WorkflowContext.NOT_STARTED) {
       _record.setSimpleField(ContextProperties.EXECUTION_START_TIME.toString(), String.valueOf(t));
+      markJobContextAsModified();
     }
   }
 
@@ -319,6 +361,17 @@ public class JobContext extends HelixProperty {
    * @param partitionSeqNumber
    */
   public void removePartition(int partitionSeqNumber) {
-    _record.getMapFields().remove(String.valueOf(partitionSeqNumber));
+    if (getPartitionSet().contains(partitionSeqNumber)) {
+      _record.getMapFields().remove(String.valueOf(partitionSeqNumber));
+      markJobContextAsModified();
+    }
+  }
+
+  public void markJobContextAsModified() {
+    this.isModified = true;
+  }
+
+  public boolean isJobContextModified() {
+    return this.isModified;
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java
index a086d46..8a11242 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java
@@ -22,6 +22,8 @@ package org.apache.helix.integration.task;
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
@@ -125,4 +127,96 @@ public class TestContextRedundantUpdates extends TaskTestBase {
 
     Assert.assertEquals(initialWorkflowContextVersion, finalWorkflowContextVersion);
   }
+
+  @Test
+  public void testRunningJobContextNoUpdate() throws Exception {
+    // Create a workflow with a long running job
+    String workflowName1 = TestHelper.getTestMethodName() + "_1";
+    String jobName = "JOB0";
+    JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setWorkflow(workflowName1).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+        .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000000"));
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+        TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(workflowName1, TaskState.IN_PROGRESS);
+    // Make sure task has been assigned to the participant is in RUNNING state
+    boolean isTaskAssignedAndRunning = TestHelper.verify(() -> {
+      JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName1, jobName));
+      String participant = ctx.getAssignedParticipant(0);
+      TaskPartitionState taskState = ctx.getPartitionState(0);
+      return (participant != null && taskState.equals(TaskPartitionState.RUNNING));
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isTaskAssignedAndRunning);
+
+    int initialJobContextVersion = _manager.getHelixDataAccessor()
+        .getProperty(
+            _manager.getHelixDataAccessor().keyBuilder().jobContextZNode(workflowName1, jobName))
+        .getRecord().getVersion();
+
+    // Create another workflow with short running job
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setWorkflow(workflowName2).setNumberOfTasks(10).setNumConcurrentTasksPerInstance(100)
+        .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "2000"));
+    Workflow.Builder workflowBuilder2 =
+        new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder2);
+    // Start new workflow and make sure it gets completed. This would help us to make sure pipeline
+    // has been run several times
+    _driver.start(workflowBuilder2.build());
+    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+    int finalJobContextVersion = _manager.getHelixDataAccessor()
+        .getProperty(
+            _manager.getHelixDataAccessor().keyBuilder().jobContextZNode(workflowName1, jobName))
+        .getRecord().getVersion();
+    Assert.assertEquals(initialJobContextVersion, finalJobContextVersion);
+  }
+
+  @Test
+  public void testCompletedJobContextNoUpdate() throws Exception {
+    // Create a workflow with a short running job
+    String workflowName1 = TestHelper.getTestMethodName() + "_1";
+    String jobName = "JOB0";
+    JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setWorkflow(workflowName1).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+        .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+        TaskState.COMPLETED);
+    _driver.pollForWorkflowState(workflowName1, TaskState.COMPLETED);
+
+    int initialJobContextVersion = _manager.getHelixDataAccessor()
+        .getProperty(
+            _manager.getHelixDataAccessor().keyBuilder().jobContextZNode(workflowName1, jobName))
+        .getRecord().getVersion();
+
+    // Create another workflow with short running job
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setWorkflow(workflowName2).setNumberOfTasks(10).setNumConcurrentTasksPerInstance(100)
+        .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "2000"));
+    Workflow.Builder workflowBuilder2 =
+        new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder2);
+    // Start new workflow and make sure it gets completed. This would help us to make sure pipeline
+    // has been run several times
+    _driver.start(workflowBuilder2.build());
+    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+    int finalJobContextVersion = _manager.getHelixDataAccessor()
+        .getProperty(
+            _manager.getHelixDataAccessor().keyBuilder().jobContextZNode(workflowName1, jobName))
+        .getRecord().getVersion();
+    Assert.assertEquals(initialJobContextVersion, finalJobContextVersion);
+  }
 }