You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2021/01/13 19:39:04 UTC
[helix] branch master updated: Eliminate redundant job context
writes (#1611)
This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new b688910 Eliminate redundant job context writes (#1611)
b688910 is described below
commit b6889108bd6f3b744242b3924074bf40bc01efbd
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Jan 13 11:38:56 2021 -0800
Eliminate redundant job context writes (#1611)
In this commit, the job context will only be written to ZK if
it has been modified. Otherwise, there is no need to update
the context information in the ZK.
---
.../apache/helix/common/caches/TaskDataCache.java | 4 +-
.../java/org/apache/helix/task/JobContext.java | 103 ++++++++++++++++-----
.../task/TestContextRedundantUpdates.java | 94 +++++++++++++++++++
3 files changed, 175 insertions(+), 26 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 71cefcd..6c5c136 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -252,7 +252,9 @@ public class TaskDataCache extends AbstractDataCache {
* Update context of the Job
*/
public void updateJobContext(String resourceName, JobContext jobContext) {
- updateContext(resourceName, jobContext.getRecord());
+ if (!_contextMap.containsKey(resourceName) || jobContext.isJobContextModified()) {
+ updateContext(resourceName, jobContext.getRecord());
+ }
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 51e062b..83ec551 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -50,12 +50,20 @@ public class JobContext extends HelixProperty {
EXECUTION_START_TIME, // Time at which the first task of this job got scheduled
}
+ // Note: This field needs to be set if any of the job context fields have been changed.
+ // Otherwise, the context will not be written to ZK by the controller.
+ private boolean isModified;
+
public JobContext(ZNRecord record) {
super(record);
+ isModified = false;
}
public void setStartTime(long t) {
- _record.setSimpleField(ContextProperties.START_TIME.toString(), String.valueOf(t));
+ if (getStartTime() != t) {
+ _record.setSimpleField(ContextProperties.START_TIME.toString(), String.valueOf(t));
+ markJobContextAsModified();
+ }
}
public long getStartTime() {
@@ -67,7 +75,10 @@ public class JobContext extends HelixProperty {
}
public void setFinishTime(long t) {
- _record.setSimpleField(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+ if (getFinishTime() != t) {
+ _record.setSimpleField(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+ markJobContextAsModified();
+ }
}
public long getFinishTime() {
@@ -79,8 +90,9 @@ public class JobContext extends HelixProperty {
}
public void setInfo(String info) {
- if (info != null) {
+ if (info != null && !info.equals(getInfo())) {
_record.setSimpleField(ContextProperties.INFO.toString(), info);
+ markJobContextAsModified();
}
}
@@ -89,8 +101,11 @@ public class JobContext extends HelixProperty {
}
public void setPartitionState(int p, TaskPartitionState s) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.STATE.toString(), s.name());
+ if (s != null && !s.equals(getPartitionState(p))) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.STATE.toString(), s.name());
+ markJobContextAsModified();
+ }
}
public TaskPartitionState getPartitionState(int p) {
@@ -107,8 +122,11 @@ public class JobContext extends HelixProperty {
}
public void setPartitionNumAttempts(int p, int n) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
+ if (getPartitionNumAttempts(p) != n) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n));
+ markJobContextAsModified();
+ }
}
public int incrementNumAttempts(int pId) {
@@ -134,8 +152,11 @@ public class JobContext extends HelixProperty {
}
public void setPartitionStartTime(int p, long t) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.START_TIME.toString(), String.valueOf(t));
+ if (getPartitionStartTime(p) != t) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.START_TIME.toString(), String.valueOf(t));
+ markJobContextAsModified();
+ }
}
public long getPartitionStartTime(int p) {
@@ -151,8 +172,11 @@ public class JobContext extends HelixProperty {
}
public void setPartitionFinishTime(int p, long t) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+ if (getPartitionFinishTime(p) != t) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+ markJobContextAsModified();
+ }
}
public long getPartitionFinishTime(int p) {
@@ -168,8 +192,11 @@ public class JobContext extends HelixProperty {
}
public void setPartitionTarget(int p, String targetPName) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.TARGET.toString(), targetPName);
+ if (targetPName != null && !targetPName.equals(getTargetForPartition(p))) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.TARGET.toString(), targetPName);
+ markJobContextAsModified();
+ }
}
public String getTargetForPartition(int p) {
@@ -178,8 +205,11 @@ public class JobContext extends HelixProperty {
}
public void setPartitionInfo(int p, String info) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.INFO.toString(), info);
+ if (info != null && !info.equals(getPartitionInfo(p))) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.INFO.toString(), info);
+ markJobContextAsModified();
+ }
}
public String getPartitionInfo(int p) {
@@ -216,8 +246,11 @@ public class JobContext extends HelixProperty {
}
public void setTaskIdForPartition(int p, String taskId) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.TASK_ID.toString(), taskId);
+ if (taskId != null && !taskId.equals(getTaskIdForPartition(p))) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.TASK_ID.toString(), taskId);
+ markJobContextAsModified();
+ }
}
public String getTaskIdForPartition(int p) {
@@ -238,8 +271,11 @@ public class JobContext extends HelixProperty {
}
public void setAssignedParticipant(int p, String participantName) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
+ if (participantName != null && !participantName.equals(getAssignedParticipant(p))) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
+ markJobContextAsModified();
+ }
}
public String getAssignedParticipant(int p) {
@@ -248,8 +284,11 @@ public class JobContext extends HelixProperty {
}
public void setNextRetryTime(int p, long t) {
- Map<String, String> map = getMapField(p, true);
- map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t));
+ if (getNextRetryTime(p) != t) {
+ Map<String, String> map = getMapField(p, true);
+ map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t));
+ markJobContextAsModified();
+ }
}
public long getNextRetryTime(int p) {
@@ -265,7 +304,10 @@ public class JobContext extends HelixProperty {
}
public void setName(String name) {
- _record.setSimpleField(ContextProperties.NAME.name(), name);
+ if (!name.equals(getName())) {
+ _record.setSimpleField(ContextProperties.NAME.name(), name);
+ markJobContextAsModified();
+ }
}
public String getName() {
@@ -281,9 +323,9 @@ public class JobContext extends HelixProperty {
* @param t
*/
public void setExecutionStartTime(long t) {
- String tStr = _record.getSimpleField(ContextProperties.EXECUTION_START_TIME.toString());
- if (tStr == null) {
+ if (getExecutionStartTime() == WorkflowContext.NOT_STARTED) {
_record.setSimpleField(ContextProperties.EXECUTION_START_TIME.toString(), String.valueOf(t));
+ markJobContextAsModified();
}
}
@@ -319,6 +361,17 @@ public class JobContext extends HelixProperty {
* @param partitionSeqNumber
*/
public void removePartition(int partitionSeqNumber) {
- _record.getMapFields().remove(String.valueOf(partitionSeqNumber));
+ if (getPartitionSet().contains(partitionSeqNumber)) {
+ _record.getMapFields().remove(String.valueOf(partitionSeqNumber));
+ markJobContextAsModified();
+ }
+ }
+
+ public void markJobContextAsModified() {
+ this.isModified = true;
+ }
+
+ public boolean isJobContextModified() {
+ return this.isModified;
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java
index a086d46..8a11242 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestContextRedundantUpdates.java
@@ -22,6 +22,8 @@ package org.apache.helix.integration.task;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.TestHelper;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
@@ -125,4 +127,96 @@ public class TestContextRedundantUpdates extends TaskTestBase {
Assert.assertEquals(initialWorkflowContextVersion, finalWorkflowContextVersion);
}
+
+ @Test
+ public void testRunningJobContextNoUpdate() throws Exception {
+ // Create a workflow with a long running job
+ String workflowName1 = TestHelper.getTestMethodName() + "_1";
+ String jobName = "JOB0";
+ JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setWorkflow(workflowName1).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+ .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000000"));
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ _driver.pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+ TaskState.IN_PROGRESS);
+ _driver.pollForWorkflowState(workflowName1, TaskState.IN_PROGRESS);
+ // Make sure task has been assigned to the participant is in RUNNING state
+ boolean isTaskAssignedAndRunning = TestHelper.verify(() -> {
+ JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName1, jobName));
+ String participant = ctx.getAssignedParticipant(0);
+ TaskPartitionState taskState = ctx.getPartitionState(0);
+ return (participant != null && taskState.equals(TaskPartitionState.RUNNING));
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isTaskAssignedAndRunning);
+
+ int initialJobContextVersion = _manager.getHelixDataAccessor()
+ .getProperty(
+ _manager.getHelixDataAccessor().keyBuilder().jobContextZNode(workflowName1, jobName))
+ .getRecord().getVersion();
+
+ // Create another workflow with short running job
+ String workflowName2 = TestHelper.getTestMethodName() + "_2";
+ JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setWorkflow(workflowName2).setNumberOfTasks(10).setNumConcurrentTasksPerInstance(100)
+ .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "2000"));
+ Workflow.Builder workflowBuilder2 =
+ new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder2);
+ // Start new workflow and make sure it gets completed. This would help us to make sure pipeline
+ // has been run several times
+ _driver.start(workflowBuilder2.build());
+ _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+ int finalJobContextVersion = _manager.getHelixDataAccessor()
+ .getProperty(
+ _manager.getHelixDataAccessor().keyBuilder().jobContextZNode(workflowName1, jobName))
+ .getRecord().getVersion();
+ Assert.assertEquals(initialJobContextVersion, finalJobContextVersion);
+ }
+
+ @Test
+ public void testCompletedJobContextNoUpdate() throws Exception {
+ // Create a workflow with a short running job
+ String workflowName1 = TestHelper.getTestMethodName() + "_1";
+ String jobName = "JOB0";
+ JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setWorkflow(workflowName1).setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100)
+ .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ _driver.pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+ TaskState.COMPLETED);
+ _driver.pollForWorkflowState(workflowName1, TaskState.COMPLETED);
+
+ int initialJobContextVersion = _manager.getHelixDataAccessor()
+ .getProperty(
+ _manager.getHelixDataAccessor().keyBuilder().jobContextZNode(workflowName1, jobName))
+ .getRecord().getVersion();
+
+ // Create another workflow with short running job
+ String workflowName2 = TestHelper.getTestMethodName() + "_2";
+ JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setWorkflow(workflowName2).setNumberOfTasks(10).setNumConcurrentTasksPerInstance(100)
+ .setTimeoutPerTask(Long.MAX_VALUE).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "2000"));
+ Workflow.Builder workflowBuilder2 =
+ new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder2);
+ // Start new workflow and make sure it gets completed. This would help us to make sure pipeline
+ // has been run several times
+ _driver.start(workflowBuilder2.build());
+ _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+ int finalJobContextVersion = _manager.getHelixDataAccessor()
+ .getProperty(
+ _manager.getHelixDataAccessor().keyBuilder().jobContextZNode(workflowName1, jobName))
+ .getRecord().getVersion();
+ Assert.assertEquals(initialJobContextVersion, finalJobContextVersion);
+ }
}