You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:42 UTC
[helix] 06/10: Remove previousAssignment read/write to ZK (#1074)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit f11243f693eee572c2e681ef4a3feafd3d2dd88d
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Jun 17 11:26:27 2020 -0700
Remove previousAssignment read/write to ZK (#1074)
Remove previousAssignment read/write to ZK
In this commit, the previousAssignment has been removed from codebase
and controller will no longer read/write previousAssignment from/to ZK.
---
.../apache/helix/common/caches/TaskDataCache.java | 50 ++--------------------
.../java/org/apache/helix/task/JobDispatcher.java | 16 ++-----
.../org/apache/helix/task/WorkflowDispatcher.java | 5 +--
.../task/TestTaskSchedulingTwoCurrentStates.java | 13 ------
.../helix/integration/task/TestTaskStopQueue.java | 10 +----
.../helix/task/TestTargetedTaskStateChange.java | 25 +----------
6 files changed, 11 insertions(+), 108 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index adb1c54..f6f6a72 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -59,9 +59,6 @@ public class TaskDataCache extends AbstractDataCache {
// TODO: context and previous assignment should be wrapped into a class. Otherwise, int the future,
// concurrency will be hard to handle.
private Map<String, ZNRecord> _contextMap = new HashMap<>();
- private Map<String, ZNRecord> _prevAssignmentMap = new HashMap<>();
- private Set<String> _prevAssignmentToUpdate = new HashSet<>();
- private Set<String> _prevAssignmentToRemove = new HashSet<>();
private Set<String> _contextToUpdate = new HashSet<>();
private Set<String> _contextToRemove = new HashSet<>();
// The following fields have been added for quota-based task scheduling
@@ -72,8 +69,7 @@ public class TaskDataCache extends AbstractDataCache {
private Set<String> _dispatchedJobs = new HashSet<>();
private enum TaskDataType {
- CONTEXT,
- PREV_ASSIGNMENT
+ CONTEXT
}
@@ -99,7 +95,7 @@ public class TaskDataCache extends AbstractDataCache {
*/
public synchronized boolean refresh(HelixDataAccessor accessor,
Map<String, ResourceConfig> resourceConfigMap) {
- refreshContextsAndPreviousAssignments(accessor);
+ refreshContexts(accessor);
// update workflow and job configs.
_workflowConfigMap.clear();
Map<String, JobConfig> newJobConfigs = new HashMap<>();
@@ -175,11 +171,10 @@ public class TaskDataCache extends AbstractDataCache {
return true;
}
- private void refreshContextsAndPreviousAssignments(HelixDataAccessor accessor) {
+ private void refreshContexts(HelixDataAccessor accessor) {
// TODO: Need an optimize for reading context only if the refresh is needed.
long start = System.currentTimeMillis();
_contextMap.clear();
- _prevAssignmentMap.clear();
if (_controlContextProvider.getClusterName() == null || _controlContextProvider.getClusterName()
.equalsIgnoreCase(UNKNOWN_CLUSTER)) {
return;
@@ -187,22 +182,15 @@ public class TaskDataCache extends AbstractDataCache {
String path = String.format("/%s/%s%s", _controlContextProvider.getClusterName(),
PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT);
List<String> contextPaths = new ArrayList<>();
- List<String> prevAssignmentPaths = new ArrayList<>();
List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
if (childNames == null) {
return;
}
for (String resourceName : childNames) {
contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT));
- //Workflow does not have previous assignment
- if (!_workflowConfigMap.containsKey(resourceName)) {
- prevAssignmentPaths.add(getTaskDataPath(resourceName, TaskDataType.PREV_ASSIGNMENT));
- }
}
-
+
List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0, true);
- List<ZNRecord> prevAssignments =
- accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0, true);
for (int i = 0; i < contexts.size(); i++) {
ZNRecord context = contexts.get(i);
@@ -215,12 +203,6 @@ public class TaskDataCache extends AbstractDataCache {
}
}
- for (ZNRecord prevAssignment : prevAssignments) {
- if (prevAssignment != null) {
- _prevAssignmentMap.put(prevAssignment.getId(), prevAssignment);
- }
- }
-
if (LOG.isDebugEnabled()) {
LogUtil.logDebug(LOG, genEventInfo(),
"# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
@@ -325,13 +307,6 @@ public class TaskDataCache extends AbstractDataCache {
TaskDataType.CONTEXT);
batchDeleteData(accessor, new ArrayList<>(_contextToRemove), TaskDataType.CONTEXT);
_contextToRemove.clear();
-
- _prevAssignmentToUpdate.removeAll(_prevAssignmentToRemove);
- batchUpdateData(accessor, new ArrayList<>(_prevAssignmentToUpdate), _prevAssignmentMap,
- _prevAssignmentToUpdate, TaskDataType.PREV_ASSIGNMENT);
- batchDeleteData(accessor, new ArrayList<>(_prevAssignmentToRemove),
- TaskDataType.PREV_ASSIGNMENT);
- _prevAssignmentToRemove.clear();
}
private void batchUpdateData(HelixDataAccessor accessor, List<String> dataUpdateNames,
@@ -427,8 +402,6 @@ public class TaskDataCache extends AbstractDataCache {
switch (taskDataType) {
case CONTEXT:
return String.format("%s/%s", prevFix, TaskConstants.CONTEXT_NODE);
- case PREV_ASSIGNMENT:
- return String.format("%s/%s", prevFix, TaskConstants.PREV_RA_NODE);
}
return null;
}
@@ -451,19 +424,4 @@ public class TaskDataCache extends AbstractDataCache {
}
return null;
}
-
- public ResourceAssignment getPreviousAssignment(String resourceName) {
- return _prevAssignmentMap.get(resourceName) != null ? new ResourceAssignment(
- _prevAssignmentMap.get(resourceName)) : null;
- }
-
- public void setPreviousAssignment(String resourceName, ResourceAssignment prevAssignment) {
- _prevAssignmentMap.put(resourceName, prevAssignment.getRecord());
- _prevAssignmentToUpdate.add(resourceName);
- }
-
- public void removePrevAssignment(String resourceName) {
- _prevAssignmentMap.remove(resourceName);
- _prevAssignmentToRemove.add(resourceName);
- }
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 191a2ea..c2b724b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -133,17 +133,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
}
- // Grab the old assignment, or an empty one if it doesn't exist
- ResourceAssignment prevAssignment =
- _dataProvider.getTaskDataCache().getPreviousAssignment(jobName);
- if (prevAssignment == null) {
- prevAssignment = new ResourceAssignment(jobName);
- }
-
// Will contain the list of partitions that must be explicitly dropped from the ideal state that
// is stored in zk.
- // Fetch the previous resource assignment from the property store. This is required because of
- // HELIX-230.
Set<String> liveInstances =
jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
: _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
@@ -191,7 +182,6 @@ public class JobDispatcher extends AbstractTaskDispatcher {
// Update Workflow and Job context in data cache and ZK.
_dataProvider.updateJobContext(jobName, jobCtx);
_dataProvider.updateWorkflowContext(workflowResource, workflowCtx);
- _dataProvider.getTaskDataCache().setPreviousAssignment(jobName, newAssignment);
LOG.debug("Job " + jobName + " new assignment "
+ Arrays.toString(newAssignment.getMappedPartitions().toArray()));
@@ -382,7 +372,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
* are accounted for
* @param jobName job name
* @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
- * @return instance -> partitionIds from previous assignment, if the instance is still live
+ * @return instance -> partitionIds from currentState, if the instance is still live
*/
protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
@@ -424,8 +414,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
}
/**
- * If partition is missing from prevInstanceToTaskAssignments (e.g. previous assignment is
- * deleted) it is added from context. Otherwise, the context won't be updated.
+ * If partition is missing from prevInstanceToTaskAssignments it is added from context. Otherwise,
+ * the context won't be updated.
* @param jobCtx Job Context
* @param currentInstanceToTaskAssignments instance -> partitionIds from CurrentStateOutput
*/
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 4c143d2..53be558 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -588,7 +588,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
// Only remove from cache when remove all workflow success. Otherwise, batch write will
// clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
// and jobs will rescheduled again.
- removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache());
+ removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache());
}
} else {
LOG.info("Did not clean up workflow " + workflow
@@ -596,12 +596,11 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
}
}
- private void removeContextsAndPreviousAssignment(String workflow, Set<String> jobs,
+ private void removeContexts(String workflow, Set<String> jobs,
TaskDataCache cache) {
if (jobs != null) {
for (String job : jobs) {
cache.removeContext(job);
- cache.removePrevAssignment(job);
}
}
cache.removeContext(workflow);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
index fe9cb3c..bb970c7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
@@ -35,15 +35,12 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.MasterSlaveSMD;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
@@ -185,16 +182,6 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(isCurrentStateCreated);
- String previousAssignmentPath = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/"
- + namespacedJobName + "/PreviousResourceAssignment";
- ResourceAssignment prevAssignment = new ResourceAssignment(namespacedJobName);
- Map<String, String> replicaMap = new HashMap<>();
- replicaMap.put(instanceP0, TaskPartitionState.RUNNING.name());
- Partition taskPartition = new Partition(namespacedJobName + "_0");
- prevAssignment.addReplicaMap(taskPartition, replicaMap);
- _manager.getHelixDataAccessor().getBaseDataAccessor().set(previousAssignmentPath,
- prevAssignment.getRecord(), AccessOption.PERSISTENT);
-
// Wait until the job is finished.
_driver.pollForJobState(jobQueueName, namespacedJobName, TaskState.COMPLETED);
Assert.assertEquals(CANCEL_COUNT.get(), 0);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
index add5687..f9ae845 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
@@ -33,8 +33,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
/**
- * This test makes sure the workflow can be stopped if previousAssignment and currentState are
- * deleted.
+ * This test makes sure the workflow can be stopped if currentState is deleted.
*/
public class TestTaskStopQueue extends TaskTestBase {
private static final long TIMEOUT = 200000L;
@@ -75,13 +74,6 @@ public class TestTaskStopQueue extends TaskTestBase {
.exists(currentStatePath, AccessOption.PERSISTENT));
}
- String previousAssignment = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/"
- + namespacedJobName + "/PreviousResourceAssignment";
- _manager.getHelixDataAccessor().getBaseDataAccessor().remove(previousAssignment,
- AccessOption.PERSISTENT);
- Assert.assertFalse(_manager.getHelixDataAccessor().getBaseDataAccessor()
- .exists(previousAssignment, AccessOption.PERSISTENT));
-
// Start the Controller
String controllerName = CONTROLLER_PREFIX + "_1";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index 3913c2d..b79dcb9 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -20,7 +20,6 @@ package org.apache.helix.task;
*/
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -35,7 +34,6 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -77,7 +75,6 @@ public class TestTargetedTaskStateChange {
* different instances.
* Scenario:
* Instance0: Slave, Instance1: Master, Instance2: Slave
- * PreviousAssignment of Task: Instance0: Running
* CurrentState: Instance0: Running, Instance1: Running
* Expected paMap: Instance0 -> Dropped
*/
@@ -91,8 +88,6 @@ public class TestTargetedTaskStateChange {
when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
- when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
- .thenReturn(mock._resourceAssignment);
when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
_assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
@@ -114,12 +109,9 @@ public class TestTargetedTaskStateChange {
}
/**
- * This test checks the behaviour of the controller while there is one current state which is
- * different from
- * Previous Assignment information.
+ * This test checks the behaviour of the controller while there is one current state.
* Scenario:
* Instance0: Slave, Instance1: Master, Instance2: Slave
- * PreviousAssignment of Task: Instance0: Dropped
* CurrentState: Instance0: Running
* Expected paMap: Instance1 -> Running
*/
@@ -133,8 +125,6 @@ public class TestTargetedTaskStateChange {
when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
- when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
- .thenReturn(mock._resourceAssignment2);
when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
_assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
@@ -291,15 +281,6 @@ public class TestTargetedTaskStateChange {
return currentStateOutput;
}
- private ResourceAssignment preparePreviousAssignment(String instance, String state) {
- ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME);
- Map<String, String> replicaMap = new HashMap<>();
- replicaMap.put(instance, state);
- Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
- prevAssignment.addReplicaMap(taskPartition, replicaMap);
- return prevAssignment;
- }
-
private class MockTestInformation {
private static final String SLAVE_INSTANCE = INSTANCE_PREFIX + "0";
private static final String MASTER_INSTANCE = INSTANCE_PREFIX + "1";
@@ -316,10 +297,6 @@ public class TestTargetedTaskStateChange {
SLAVE_INSTANCE, TaskPartitionState.RUNNING.name(), TaskPartitionState.RUNNING.name());
private CurrentStateOutput _currentStateOutput2 =
prepareCurrentState2(MASTER_INSTANCE, TaskPartitionState.RUNNING.name());
- private ResourceAssignment _resourceAssignment =
- preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.RUNNING.name());
- private ResourceAssignment _resourceAssignment2 =
- preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.DROPPED.name());
private TaskDataCache _taskDataCache = mock(TaskDataCache.class);
private RuntimeJobDag _runtimeJobDag = mock(RuntimeJobDag.class);