You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/11/24 19:36:54 UTC

[GitHub] [helix] alirezazamani commented on a change in pull request #1550: Controller-side Task Current State Migration

alirezazamani commented on a change in pull request #1550:
URL: https://github.com/apache/helix/pull/1550#discussion_r529817108



##########
File path: helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
##########
@@ -78,14 +82,17 @@ private void refreshClusterStateChangeFlags(Set<HelixConstants.ChangeType> prope
         // This check (and set) is necessary for now since the current state flag in
         // _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
         _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
-            || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE).getAndSet(false)
-            || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
+            || _propertyDataChangedMap.get(HelixConstants.ChangeType.TASK_CURRENT_STATE)
+            .getAndSet(false) || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE)
+            .getAndSet(false) || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
+            || propertyRefreshed.contains(HelixConstants.ChangeType.TASK_CURRENT_STATE)
             || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
     Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+    _taskCurrentStateCache.refresh(accessor, getLiveInstanceCache().getPropertyMap());

Review comment:
       Why do we need to refreh here? Can TaskCurrentState get refreshed like regular current states?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
##########
@@ -44,6 +44,7 @@
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
 import org.apache.helix.common.caches.PropertyCache;
+import org.apache.helix.common.caches.TaskCurrentStateCache;

Review comment:
       Unused import?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -79,7 +79,25 @@ public void process(ClusterEvent event) throws Exception {
 
     // It's important to get partitions from CurrentState as well since the
     // idealState might be removed.
-    processCurrentStates(cache, resourceMap, resourceToRebalance, idealStates, isTaskCache);
+    Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
+    for (LiveInstance instance : availableInstances.values()) {
+      String instanceName = instance.getInstanceName();
+      String clientSessionId = instance.getEphemeralOwner();
+
+      Map<String, CurrentState> currentStateMap =
+          cache.getCurrentState(instanceName, clientSessionId);
+      processCurrentStates(currentStateMap, resourceMap, resourceToRebalance, idealStates,
+          isTaskCache);
+      // Duplicate resource names between regular and task resources may happen, but most likely
+      // won't. If it does, let regular resources overwrite task resources. To avoid duplicate
+      // resource overwriting, it's better to split regular and task pipelines entirely.

Review comment:
       Good comment

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -89,6 +90,13 @@ public void process(ClusterEvent event) throws Exception {
       String instanceSessionId = instance.getEphemeralOwner();
 
       // update current states.
+      // Like ResourceComputationStage, we give priority to regular resources, so update task ones
+      // first and allow regular ones to overwrite if there's any name conflicts.
+      if (_isTaskFrameworkPipeline) {

Review comment:
       Assuming we only refresh the task current state for the task pipeline then I got my answer to the above question? 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org