You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/12/02 06:12:45 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #1550: Controller-side Task Current State Migration

jiajunwang commented on a change in pull request #1550:
URL: https://github.com/apache/helix/pull/1550#discussion_r533908911



##########
File path: helix-core/src/main/java/org/apache/helix/HelixConstants.java
##########
@@ -34,6 +34,7 @@
     CLUSTER_CONFIG (PropertyType.CONFIGS),
     LIVE_INSTANCE (PropertyType.LIVEINSTANCES),
     CURRENT_STATE (PropertyType.CURRENTSTATES),
+    TASK_CURRENT_STATE (PropertyType.TASKCURRENTSTATES),

Review comment:
       nit, but might be important, can we just call it TASK_STATE for simplicity?

##########
File path: helix-core/src/main/java/org/apache/helix/PropertyKey.java
##########
@@ -485,6 +486,50 @@ public PropertyKey currentState(String instanceName, String sessionId, String re
       }
     }
 
+    /**
+     * Get a property key associated with {@link CurrentState} of an instance and session. This key
+     * is for TaskCurrentState specifically.
+     * @param instanceName
+     * @param sessionId
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey taskCurrentStates(String instanceName, String sessionId) {
+      return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, _clusterName, instanceName,
+          sessionId);
+    }
+
+    /**
+     * Get a property key associated with {@link CurrentState} of an instance, session, and
+     * resource. This key is for TaskCurrentState specifically.
+     * @param instanceName
+     * @param sessionId
+     * @param resourceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey taskCurrentState(String instanceName, String sessionId, String resourceName) {

Review comment:
       Shall we call it jobName instead of resourceName?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -89,6 +89,13 @@ public void process(ClusterEvent event) throws Exception {
       String instanceSessionId = instance.getEphemeralOwner();
 
       // update current states.
+      // Like ResourceComputationStage, we give priority to regular resources, so update task ones
+      // first and allow regular ones to overwrite if there's any name conflicts.
+      if (_isTaskFrameworkPipeline) {
+        Map<String, CurrentState> taskCurrentStateMap = ((WorkflowControllerDataProvider) cache)

Review comment:
       Strictly speaking, the pipelines are already split. So there won't be a separate project to split the pipeline further. Note that we have 2 pipelines already. The project we planned to do is split the code. So if you can do it in this PR, we will save some later work. Otherwise, any new code you added here using a branch might need to be re-write soon : )

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
##########
@@ -79,7 +79,25 @@ public void process(ClusterEvent event) throws Exception {
 
     // It's important to get partitions from CurrentState as well since the
     // idealState might be removed.
-    processCurrentStates(cache, resourceMap, resourceToRebalance, idealStates, isTaskCache);
+    Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
+    for (LiveInstance instance : availableInstances.values()) {
+      String instanceName = instance.getInstanceName();
+      String clientSessionId = instance.getEphemeralOwner();
+
+      Map<String, CurrentState> currentStateMap =
+          cache.getCurrentState(instanceName, clientSessionId);
+      processCurrentStates(currentStateMap, resourceMap, resourceToRebalance, idealStates,

Review comment:
       Since processCurrentStates already takes isTaskCache, can we put these complex into the processCurrentStates() method?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
##########
@@ -78,14 +82,17 @@ private void refreshClusterStateChangeFlags(Set<HelixConstants.ChangeType> prope
         // This check (and set) is necessary for now since the current state flag in
         // _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
         _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
-            || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE).getAndSet(false)
-            || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
+            || _propertyDataChangedMap.get(HelixConstants.ChangeType.TASK_CURRENT_STATE)
+            .getAndSet(false) || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE)
+            .getAndSet(false) || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
+            || propertyRefreshed.contains(HelixConstants.ChangeType.TASK_CURRENT_STATE)
             || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
     Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);

Review comment:
       Seems you did not update the propertyRefreshed flag for _taskCurrentStateCache, so the refreshClusterStateChangeFlags() won't work as expected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org