You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:42 UTC

[helix] 06/10: Remove previousAssignment read/write to ZK (#1074)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f11243f693eee572c2e681ef4a3feafd3d2dd88d
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Jun 17 11:26:27 2020 -0700

    Remove previousAssignment read/write to ZK (#1074)
    
    Remove previousAssignment read/write to ZK
    
    In this commit, the previousAssignment has been removed from codebase
    and controller will no longer read/write previousAssignment from/to ZK.
---
 .../apache/helix/common/caches/TaskDataCache.java  | 50 ++--------------------
 .../java/org/apache/helix/task/JobDispatcher.java  | 16 ++-----
 .../org/apache/helix/task/WorkflowDispatcher.java  |  5 +--
 .../task/TestTaskSchedulingTwoCurrentStates.java   | 13 ------
 .../helix/integration/task/TestTaskStopQueue.java  | 10 +----
 .../helix/task/TestTargetedTaskStateChange.java    | 25 +----------
 6 files changed, 11 insertions(+), 108 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index adb1c54..f6f6a72 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -59,9 +59,6 @@ public class TaskDataCache extends AbstractDataCache {
   // TODO: context and previous assignment should be wrapped into a class. Otherwise, int the future,
   // concurrency will be hard to handle.
   private Map<String, ZNRecord> _contextMap = new HashMap<>();
-  private Map<String, ZNRecord> _prevAssignmentMap = new HashMap<>();
-  private Set<String> _prevAssignmentToUpdate = new HashSet<>();
-  private Set<String> _prevAssignmentToRemove = new HashSet<>();
   private Set<String> _contextToUpdate = new HashSet<>();
   private Set<String> _contextToRemove = new HashSet<>();
   // The following fields have been added for quota-based task scheduling
@@ -72,8 +69,7 @@ public class TaskDataCache extends AbstractDataCache {
   private Set<String> _dispatchedJobs = new HashSet<>();
 
   private enum TaskDataType {
-    CONTEXT,
-    PREV_ASSIGNMENT
+    CONTEXT
   }
 
 
@@ -99,7 +95,7 @@ public class TaskDataCache extends AbstractDataCache {
    */
   public synchronized boolean refresh(HelixDataAccessor accessor,
       Map<String, ResourceConfig> resourceConfigMap) {
-    refreshContextsAndPreviousAssignments(accessor);
+    refreshContexts(accessor);
     // update workflow and job configs.
     _workflowConfigMap.clear();
     Map<String, JobConfig> newJobConfigs = new HashMap<>();
@@ -175,11 +171,10 @@ public class TaskDataCache extends AbstractDataCache {
     return true;
   }
 
-  private void refreshContextsAndPreviousAssignments(HelixDataAccessor accessor) {
+  private void refreshContexts(HelixDataAccessor accessor) {
     // TODO: Need an optimize for reading context only if the refresh is needed.
     long start = System.currentTimeMillis();
     _contextMap.clear();
-    _prevAssignmentMap.clear();
     if (_controlContextProvider.getClusterName() == null || _controlContextProvider.getClusterName()
         .equalsIgnoreCase(UNKNOWN_CLUSTER)) {
       return;
@@ -187,22 +182,15 @@ public class TaskDataCache extends AbstractDataCache {
     String path = String.format("/%s/%s%s", _controlContextProvider.getClusterName(),
         PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT);
     List<String> contextPaths = new ArrayList<>();
-    List<String> prevAssignmentPaths = new ArrayList<>();
     List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
     if (childNames == null) {
       return;
     }
     for (String resourceName : childNames) {
       contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT));
-      //Workflow does not have previous assignment
-      if (!_workflowConfigMap.containsKey(resourceName)) {
-        prevAssignmentPaths.add(getTaskDataPath(resourceName, TaskDataType.PREV_ASSIGNMENT));
-      }
     }
-
+    
     List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0, true);
-    List<ZNRecord> prevAssignments =
-        accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0, true);
 
     for (int i = 0; i < contexts.size(); i++) {
       ZNRecord context = contexts.get(i);
@@ -215,12 +203,6 @@ public class TaskDataCache extends AbstractDataCache {
       }
     }
 
-    for (ZNRecord prevAssignment : prevAssignments) {
-      if (prevAssignment != null) {
-        _prevAssignmentMap.put(prevAssignment.getId(), prevAssignment);
-      }
-    }
-
     if (LOG.isDebugEnabled()) {
       LogUtil.logDebug(LOG, genEventInfo(),
           "# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
@@ -325,13 +307,6 @@ public class TaskDataCache extends AbstractDataCache {
         TaskDataType.CONTEXT);
     batchDeleteData(accessor, new ArrayList<>(_contextToRemove), TaskDataType.CONTEXT);
     _contextToRemove.clear();
-
-    _prevAssignmentToUpdate.removeAll(_prevAssignmentToRemove);
-    batchUpdateData(accessor, new ArrayList<>(_prevAssignmentToUpdate), _prevAssignmentMap,
-        _prevAssignmentToUpdate, TaskDataType.PREV_ASSIGNMENT);
-    batchDeleteData(accessor, new ArrayList<>(_prevAssignmentToRemove),
-        TaskDataType.PREV_ASSIGNMENT);
-    _prevAssignmentToRemove.clear();
   }
 
   private void batchUpdateData(HelixDataAccessor accessor, List<String> dataUpdateNames,
@@ -427,8 +402,6 @@ public class TaskDataCache extends AbstractDataCache {
     switch (taskDataType) {
     case CONTEXT:
       return String.format("%s/%s", prevFix, TaskConstants.CONTEXT_NODE);
-    case PREV_ASSIGNMENT:
-      return String.format("%s/%s", prevFix, TaskConstants.PREV_RA_NODE);
     }
     return null;
   }
@@ -451,19 +424,4 @@ public class TaskDataCache extends AbstractDataCache {
     }
     return null;
   }
-
-  public ResourceAssignment getPreviousAssignment(String resourceName) {
-    return _prevAssignmentMap.get(resourceName) != null ? new ResourceAssignment(
-        _prevAssignmentMap.get(resourceName)) : null;
-  }
-
-  public void setPreviousAssignment(String resourceName, ResourceAssignment prevAssignment) {
-    _prevAssignmentMap.put(resourceName, prevAssignment.getRecord());
-    _prevAssignmentToUpdate.add(resourceName);
-  }
-
-  public void removePrevAssignment(String resourceName) {
-    _prevAssignmentMap.remove(resourceName);
-    _prevAssignmentToRemove.add(resourceName);
-  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 191a2ea..c2b724b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -133,17 +133,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
     }
 
-    // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment =
-        _dataProvider.getTaskDataCache().getPreviousAssignment(jobName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(jobName);
-    }
-
     // Will contain the list of partitions that must be explicitly dropped from the ideal state that
     // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
     Set<String> liveInstances =
         jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
             : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
@@ -191,7 +182,6 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Update Workflow and Job context in data cache and ZK.
     _dataProvider.updateJobContext(jobName, jobCtx);
     _dataProvider.updateWorkflowContext(workflowResource, workflowCtx);
-    _dataProvider.getTaskDataCache().setPreviousAssignment(jobName, newAssignment);
 
     LOG.debug("Job " + jobName + " new assignment "
         + Arrays.toString(newAssignment.getMappedPartitions().toArray()));
@@ -382,7 +372,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
-   * @return instance -> partitionIds from previous assignment, if the instance is still live
+   * @return instance -> partitionIds from currentState, if the instance is still live
    */
   protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
       Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
@@ -424,8 +414,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
   }
 
   /**
-   * If partition is missing from prevInstanceToTaskAssignments (e.g. previous assignment is
-   * deleted) it is added from context. Otherwise, the context won't be updated.
+   * If partition is missing from prevInstanceToTaskAssignments it is added from context. Otherwise,
+   * the context won't be updated.
    * @param jobCtx Job Context
    * @param currentInstanceToTaskAssignments instance -> partitionIds from CurrentStateOutput
    */
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 4c143d2..53be558 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -588,7 +588,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
         // Only remove from cache when remove all workflow success. Otherwise, batch write will
         // clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
         // and jobs will rescheduled again.
-        removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache());
+        removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache());
       }
     } else {
       LOG.info("Did not clean up workflow " + workflow
@@ -596,12 +596,11 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     }
   }
 
-  private void removeContextsAndPreviousAssignment(String workflow, Set<String> jobs,
+  private void removeContexts(String workflow, Set<String> jobs,
       TaskDataCache cache) {
     if (jobs != null) {
       for (String job : jobs) {
         cache.removeContext(job);
-        cache.removePrevAssignment(job);
       }
     }
     cache.removeContext(workflow);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
index fe9cb3c..bb970c7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
@@ -35,15 +35,12 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MasterSlaveSMD;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
@@ -185,16 +182,6 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
     }, TestHelper.WAIT_DURATION);
     Assert.assertTrue(isCurrentStateCreated);
 
-    String previousAssignmentPath = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/"
-        + namespacedJobName + "/PreviousResourceAssignment";
-    ResourceAssignment prevAssignment = new ResourceAssignment(namespacedJobName);
-    Map<String, String> replicaMap = new HashMap<>();
-    replicaMap.put(instanceP0, TaskPartitionState.RUNNING.name());
-    Partition taskPartition = new Partition(namespacedJobName + "_0");
-    prevAssignment.addReplicaMap(taskPartition, replicaMap);
-    _manager.getHelixDataAccessor().getBaseDataAccessor().set(previousAssignmentPath,
-        prevAssignment.getRecord(), AccessOption.PERSISTENT);
-
     // Wait until the job is finished.
     _driver.pollForJobState(jobQueueName, namespacedJobName, TaskState.COMPLETED);
     Assert.assertEquals(CANCEL_COUNT.get(), 0);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
index add5687..f9ae845 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
@@ -33,8 +33,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
- * This test makes sure the workflow can be stopped if previousAssignment and currentState are
- * deleted.
+ * This test makes sure the workflow can be stopped if currentState is deleted.
  */
 public class TestTaskStopQueue extends TaskTestBase {
   private static final long TIMEOUT = 200000L;
@@ -75,13 +74,6 @@ public class TestTaskStopQueue extends TaskTestBase {
           .exists(currentStatePath, AccessOption.PERSISTENT));
     }
 
-    String previousAssignment = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/"
-        + namespacedJobName + "/PreviousResourceAssignment";
-    _manager.getHelixDataAccessor().getBaseDataAccessor().remove(previousAssignment,
-        AccessOption.PERSISTENT);
-    Assert.assertFalse(_manager.getHelixDataAccessor().getBaseDataAccessor()
-        .exists(previousAssignment, AccessOption.PERSISTENT));
-
     // Start the Controller
     String controllerName = CONTROLLER_PREFIX + "_1";
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index 3913c2d..b79dcb9 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -20,7 +20,6 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,7 +34,6 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -77,7 +75,6 @@ public class TestTargetedTaskStateChange {
    * different instances.
    * Scenario:
    * Instance0: Slave, Instance1: Master, Instance2: Slave
-   * PreviousAssignment of Task: Instance0: Running
    * CurrentState: Instance0: Running, Instance1: Running
    * Expected paMap: Instance0 -> Dropped
    */
@@ -91,8 +88,6 @@ public class TestTargetedTaskStateChange {
     when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
     when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
     when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
-    when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
-        .thenReturn(mock._resourceAssignment);
     when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
@@ -114,12 +109,9 @@ public class TestTargetedTaskStateChange {
   }
 
   /**
-   * This test checks the behaviour of the controller while there is one current state which is
-   * different from
-   * Previous Assignment information.
+   * This test checks the behaviour of the controller while there is one current state.
    * Scenario:
    * Instance0: Slave, Instance1: Master, Instance2: Slave
-   * PreviousAssignment of Task: Instance0: Dropped
    * CurrentState: Instance0: Running
    * Expected paMap: Instance1 -> Running
    */
@@ -133,8 +125,6 @@ public class TestTargetedTaskStateChange {
     when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
     when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
     when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
-    when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
-        .thenReturn(mock._resourceAssignment2);
     when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
@@ -291,15 +281,6 @@ public class TestTargetedTaskStateChange {
     return currentStateOutput;
   }
 
-  private ResourceAssignment preparePreviousAssignment(String instance, String state) {
-    ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME);
-    Map<String, String> replicaMap = new HashMap<>();
-    replicaMap.put(instance, state);
-    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
-    prevAssignment.addReplicaMap(taskPartition, replicaMap);
-    return prevAssignment;
-  }
-
   private class MockTestInformation {
     private static final String SLAVE_INSTANCE = INSTANCE_PREFIX + "0";
     private static final String MASTER_INSTANCE = INSTANCE_PREFIX + "1";
@@ -316,10 +297,6 @@ public class TestTargetedTaskStateChange {
         SLAVE_INSTANCE, TaskPartitionState.RUNNING.name(), TaskPartitionState.RUNNING.name());
     private CurrentStateOutput _currentStateOutput2 =
         prepareCurrentState2(MASTER_INSTANCE, TaskPartitionState.RUNNING.name());
-    private ResourceAssignment _resourceAssignment =
-        preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.RUNNING.name());
-    private ResourceAssignment _resourceAssignment2 =
-        preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.DROPPED.name());
     private TaskDataCache _taskDataCache = mock(TaskDataCache.class);
     private RuntimeJobDag _runtimeJobDag = mock(RuntimeJobDag.class);