You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/25 00:48:08 UTC

helix git commit: [HELIX-745] Make AssignableInstanceManager listen on data changes to update AssignableInstances

Repository: helix
Updated Branches:
  refs/heads/master 675904095 -> 0af6e8c19


[HELIX-745] Make AssignableInstanceManager listen on data changes to update AssignableInstances

Previously, although AssignableInstanceManager provided an API for updating its AssignableInstances, this API was not being called at all. This RB fixes this.

Changelist:
1. Add a boolean flag in ClusterDataCache for LiveInstance, ClusterConfig, InstanceConfig changes
2. If the ClusterDataCache is a taskDataCache, call AssignableInstanceManager.updateAssignableInstances() when the said boolean flag is true
3. Use thread-safe map in AssignableInstanceManager
4. Address the issue of targeted tasks having null taskIds (use pName convention instead)
5. Address the issue of LiveInstanceChange not notifying the caches by explicitly using setLiveInstance() function
6. Fix bug in restoreTaskAssignResult where tasks with null quota type were not being restored properly


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0af6e8c1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0af6e8c1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0af6e8c1

Branch: refs/heads/master
Commit: 0af6e8c19af5ee916f93acd8582e53b776e9c712
Parents: 6759040
Author: Hunter Lee <na...@gmail.com>
Authored: Tue Jul 24 14:10:01 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Tue Jul 24 17:47:50 2018 -0700

----------------------------------------------------------------------
 .../helix/common/caches/TaskDataCache.java      | 24 -----------
 .../controller/stages/ClusterDataCache.java     | 39 +++++++++++++++--
 .../helix/task/AssignableInstanceManager.java   | 36 +++++++++-------
 .../task/TestQuotaBasedScheduling.java          | 44 ++++++++++++++++++++
 4 files changed, 99 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0af6e8c1/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 478b03a..8892d2e 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -72,7 +72,6 @@ public class TaskDataCache extends AbstractDataCache {
   public synchronized boolean refresh(HelixDataAccessor accessor,
       Map<String, ResourceConfig> resourceConfigMap) {
     refreshJobContexts(accessor);
-
     // update workflow and job configs.
     _workflowConfigMap.clear();
     _jobConfigMap.clear();
@@ -85,32 +84,9 @@ public class TaskDataCache extends AbstractDataCache {
         _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue()));
       }
     }
-
     return true;
   }
 
-  /**
-   * Refreshes Task Framework contexts and configs from ZooKeeper. This method also re-instantiates
-   * AssignableInstanceManager.
-   * @param accessor
-   * @param resourceConfigMap
-   * @param liveInstanceMap
-   * @param instanceConfigMap
-   * @return
-   */
-  public synchronized boolean refresh(HelixDataAccessor accessor,
-      Map<String, ResourceConfig> resourceConfigMap, ClusterConfig clusterConfig,
-      Map<String, LiveInstance> liveInstanceMap, Map<String, InstanceConfig> instanceConfigMap) {
-    // First, call the original refresh for contexts and configs
-    if (refresh(accessor, resourceConfigMap)) {
-      // Upon refresh success, re-instantiate AssignableInstanceManager from scratch
-      _assignableInstanceManager.buildAssignableInstances(clusterConfig, this, liveInstanceMap,
-          instanceConfigMap);
-      return true;
-    }
-    return false;
-  }
-
   private void refreshJobContexts(HelixDataAccessor accessor) {
     // TODO: Need an optimize for reading context only if the refresh is needed.
     long start = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/helix/blob/0af6e8c1/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 1f32c04..3e6bd86 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -114,8 +114,13 @@ public class ClusterDataCache {
   private String _clusterName;
 
   // For detecting liveinstance and target resource partition state change in task assignment
+  // Used in AbstractTaskDispatcher
   private boolean _existsLiveInstanceOrCurrentStateChange = false;
 
+  // These two flags are used to detect ClusterConfig change or LiveInstance/InstanceConfig change
+  private boolean _existsClusterConfigChange = false;
+  private boolean _existsInstanceChange = false;
+
   public ClusterDataCache() {
     this(null);
   }
@@ -153,7 +158,6 @@ public class ClusterDataCache {
     }
 
     if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) {
-      _existsLiveInstanceOrCurrentStateChange = true;
       startTime = System.currentTimeMillis();
       _propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, false);
       clearCachedResourceAssignments();
@@ -166,6 +170,7 @@ public class ClusterDataCache {
     }
 
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
+      _existsInstanceChange = true;
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, false);
       clearCachedResourceAssignments();
       _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
@@ -182,13 +187,20 @@ public class ClusterDataCache {
           + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
     }
 
-    // This is for target jobs' task assignment. It needs to watch for current state changes for
+    // This is for targeted jobs' task assignment. It needs to watch for current state changes for
     // when targeted resources' state transitions complete
     if (_propertyDataChangedMap.get(ChangeType.CURRENT_STATE)) {
       _existsLiveInstanceOrCurrentStateChange = true;
       _propertyDataChangedMap.put(ChangeType.CURRENT_STATE, false);
     }
 
+    // This is for AssignableInstances. Whenever there is a quota config change in ClusterConfig, we
+    // must trigger an update to AssignableInstanceManager
+    if (_propertyDataChangedMap.get(ChangeType.CLUSTER_CONFIG)) {
+      _existsClusterConfigChange = true;
+      _propertyDataChangedMap.put(ChangeType.CLUSTER_CONFIG, false);
+    }
+
     _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
     _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
     _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap);
@@ -205,8 +217,23 @@ public class ClusterDataCache {
     _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
 
     if (_isTaskCache) {
-      _taskDataCache.refresh(accessor, _resourceConfigMap, _clusterConfig, _liveInstanceMap,
-          _instanceConfigMap);
+      // Refresh TaskCache
+      _taskDataCache.refresh(accessor, _resourceConfigMap);
+
+      // Refresh AssignableInstanceManager
+      AssignableInstanceManager assignableInstanceManager =
+          _taskDataCache.getAssignableInstanceManager();
+      if (_existsClusterConfigChange) {
+        // Update both flags since buildAssignableInstances includes updateAssignableInstances
+        _existsClusterConfigChange = false;
+        _existsInstanceChange = false;
+        assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache,
+            _liveInstanceMap, _instanceConfigMap);
+      } else if (_existsInstanceChange) {
+        _existsInstanceChange = false;
+        assignableInstanceManager.updateAssignableInstances(_clusterConfig, _liveInstanceMap,
+            _instanceConfigMap);
+      }
     }
 
     _instanceMessagesCache.refresh(accessor, _liveInstanceMap);
@@ -423,6 +450,10 @@ public class ClusterDataCache {
     }
     _liveInstanceCacheMap = liveInstanceMap;
     _updateInstanceOfflineTime = true;
+
+    // TODO: Move this when listener for LiveInstance is being refactored
+    _existsInstanceChange = true;
+    _existsLiveInstanceOrCurrentStateChange = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/0af6e8c1/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index e25e23a..abe5f1c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
@@ -41,21 +42,19 @@ public class AssignableInstanceManager {
   private Map<String, AssignableInstance> _assignableInstanceMap;
   // TaskID -> TaskAssignResult TODO: Hunter: Move this if not needed
   private Map<String, TaskAssignResult> _taskAssignResultMap;
-  private boolean _hasBeenBuilt; // Flag for whether AssignableInstances have been built
 
   /**
    * Basic constructor for AssignableInstanceManager to allow an empty instantiation.
    * buildAssignableInstances() must be explicitly called after instantiation.
    */
   public AssignableInstanceManager() {
-    _assignableInstanceMap = new HashMap<>();
-    _taskAssignResultMap = new HashMap<>();
-    _hasBeenBuilt = false; // AssignableInstances haven't been built
+    _assignableInstanceMap = new ConcurrentHashMap<>();
+    _taskAssignResultMap = new ConcurrentHashMap<>();
   }
 
   /**
    * Builds AssignableInstances and restores TaskAssignResults from scratch by reading from
-   * TaskDataCache.
+   * TaskDataCache. It re-computes current quota profile for each AssignableInstance.
    * @param clusterConfig
    * @param taskDataCache
    * @param liveInstances
@@ -63,13 +62,9 @@ public class AssignableInstanceManager {
    */
   public void buildAssignableInstances(ClusterConfig clusterConfig, TaskDataCache taskDataCache,
       Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> instanceConfigs) {
-    // Only need to build from scratch during Controller switch, etc.
-    // This keeps the pipeline from building from scratch every cache refresh
-    if (_hasBeenBuilt) {
-      // If it has been already built, just update (configs and LiveInstance changes may be present)
-      updateAssignableInstances(clusterConfig, liveInstances, instanceConfigs);
-      return;
-    }
+    // Reset all cached information
+    _assignableInstanceMap.clear();
+    _taskAssignResultMap.clear();
 
     // Create all AssignableInstance objects based on what's in liveInstances
     for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
@@ -100,6 +95,9 @@ public class AssignableInstanceManager {
         continue; // Ignore this job if either the config or context is null
       }
       String quotaType = jobConfig.getJobType();
+      if (quotaType == null) {
+        quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+      }
       Set<Integer> taskIndices = jobContext.getPartitionSet(); // Each integer represents a task in
       // this job (this is NOT taskId)
       for (int taskIndex : taskIndices) {
@@ -111,6 +109,11 @@ public class AssignableInstanceManager {
 
           String assignedInstance = jobContext.getAssignedParticipant(taskIndex);
           String taskId = jobContext.getTaskIdForPartition(taskIndex);
+          if (taskId == null) {
+            // For targeted tasks, taskId will be null
+            // We instead use pName (see FixedTargetTaskAssignmentCalculator)
+            taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex);
+          }
           if (assignedInstance == null) {
             LOG.warn(
                 "This task's TaskContext does not have an assigned instance! Task will be ignored. "
@@ -138,13 +141,13 @@ public class AssignableInstanceManager {
         }
       }
     }
-    _hasBeenBuilt = true; // Set the flag so that it's not re-building from cache every pipeline
-    // iteration
   }
 
   /**
-   * Updates AssignableInstances when there are any config changes. This update will be based on the
-   * list of LiveInstances provided.
+   * Updates AssignableInstances when there are changes in LiveInstances or InstanceConfig. This
+   * update only keeps an up-to-date count of AssignableInstances and does NOT re-build tasks
+   * (because it's costly).
+   * Call this when there is only LiveInstance/InstanceConfig change.
    * @param clusterConfig
    * @param liveInstances
    * @param instanceConfigs
@@ -156,6 +159,7 @@ public class AssignableInstanceManager {
     Collection<AssignableInstance> staleAssignableInstances =
         new HashSet<>(_assignableInstanceMap.values());
 
+    // Loop over new LiveInstances
     for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
       // Prepare instance-specific metadata
       String instanceName = liveInstanceEntry.getKey();

http://git-wip-us.apache.org/repos/asf/helix/blob/0af6e8c1/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
index db471ff..abbcf75 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
@@ -35,6 +35,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
@@ -260,6 +261,49 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
     Assert.assertFalse(_quotaTypeExecutionCount.containsKey(DEFAULT_QUOTA_TYPE));
   }
 
+  @Test(dependsOnMethods = "testSchedulingWithoutQuota")
+  public void testQuotaConfigChange() throws InterruptedException {
+    ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
+    clusterConfig.resetTaskQuotaRatioMap();
+    clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 38);
+    clusterConfig.setTaskQuotaRatio("A", 1);
+    clusterConfig.setTaskQuotaRatio("B", 1);
+    _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // 2 nodes - create 4 workflows with LongTask so that only 2 would get scheduled and run
+    for (int i = 0; i < 4; i++) {
+      String workflowName = TestHelper.getTestMethodName() + "_" + i;
+      _driver.start(createWorkflow(workflowName, true, "A", 1, 1, "LongTask"));
+      Thread.sleep(500L);
+    }
+    // Test that only 2 of the workflows are executed
+    for (int i = 0; i < 2; i++) {
+      String workflowName = TestHelper.getTestMethodName() + "_" + i;
+      _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+    }
+
+    // Test that the next two are not executing
+    JobContext context_2 = _driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0");
+    JobContext context_3 = _driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0");
+    Assert.assertNull(context_2.getPartitionState(0));
+    Assert.assertNull(context_3.getPartitionState(0));
+
+    // Change the quota config so that the rest of the workflows are in progress
+    clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
+    clusterConfig.resetTaskQuotaRatioMap();
+    clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 1);
+    clusterConfig.setTaskQuotaRatio("A", 38);
+    clusterConfig.setTaskQuotaRatio("B", 1);
+    _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Wait for the change to propagate through and test that the next two are not executing
+    Thread.sleep(1000L);
+    context_2 = _driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0");
+    context_3 = _driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0");
+    Assert.assertNotNull(context_2.getPartitionState(0));
+    Assert.assertNotNull(context_3.getPartitionState(0));
+  }
+
   /**
    * Tests that quota ratios are being observed. This is done by creating short tasks for some quota
    * types and long tasks for some quota types.