You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/12/21 23:00:36 UTC

[helix] 02/03: Task Current State Migration: helix-rest, utils, tests changes (#1579)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit db94b22c3b92b85f150680225b230c9df3882ba5
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Dec 14 14:00:56 2020 -0800

    Task Current State Migration: helix-rest, utils, tests changes (#1579)
    
    The first part of the task current state migration.
    All changes made in this commit are on the controller side and are non-pipeline related.
---
 .../tools/commandtools/CurrentStateCleanUp.java    | 24 +++++++++++++++-------
 .../src/test/java/org/apache/helix/TestHelper.java |  7 +++++++
 .../helix/controller/stages/TestTaskStage.java     |  3 ++-
 .../messaging/handling/MockHelixTaskExecutor.java  |  5 +++++
 .../resources/helix/PerInstanceAccessor.java       |  8 +++++++-
 5 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
index c657490..31c3cc7 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
@@ -19,6 +19,7 @@ package org.apache.helix.tools.commandtools;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -124,18 +125,27 @@ public class CurrentStateCleanUp {
       LOG.info(String.format("Processing cleaning current state for instance: %s", instanceName));
       List<String> currentStateNames =
           accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, session));
-      for (String currentStateName : currentStateNames) {
-        PropertyKey key =
-            accessor.keyBuilder().currentState(instanceName, session, currentStateName);
+      List<String> taskCurrentStateNames =
+          accessor.getChildNames(accessor.keyBuilder().taskCurrentStates(instanceName, session));
+      List<PropertyKey> allCurrentStateKeys = new ArrayList<>();
+      currentStateNames.stream()
+          .map(name -> accessor.keyBuilder().currentState(instanceName, session, name))
+          .forEach(allCurrentStateKeys::add);
+      taskCurrentStateNames.stream()
+          .map(name -> accessor.keyBuilder().taskCurrentState(instanceName, session, name))
+          .forEach(allCurrentStateKeys::add);
+
+      List<String> pathsToRemove = new ArrayList<>();
+      for (PropertyKey key : allCurrentStateKeys) {
         accessor.getBaseDataAccessor().update(key.getPath(), updater, AccessOption.PERSISTENT);
         CurrentState currentState = accessor.getProperty(key);
         if (currentState.getPartitionStateMap().size() == 0) {
-          accessor.getBaseDataAccessor().remove(key.getPath(), AccessOption.PERSISTENT);
-          LOG.info(String.format("Remove current state for instance: %s, resource %s", instanceName,
-              currentStateName));
+          pathsToRemove.add(key.getPath());
+          LOG.info(String.format("Remove current state for path %s", key.getPath()));
         }
-
       }
+
+      accessor.getBaseDataAccessor().remove(pathsToRemove, AccessOption.PERSISTENT);
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index c805158..79f238d 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -240,6 +240,13 @@ public class TestHelper {
           if (curState != null && curState.getRecord().getMapFields().size() != 0) {
             return false;
           }
+
+          CurrentState taskCurState =
+              accessor.getProperty(keyBuilder.taskCurrentState(instanceName, sessionId, resourceName));
+
+          if (taskCurState != null && taskCurState.getRecord().getMapFields().size() != 0) {
+            return false;
+          }
         }
 
         ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index 0810e09..b0d8da9 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -48,7 +48,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestTaskStage extends TaskTestBase {
-  private ClusterEvent _event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange);
+  private ClusterEvent _event =
+      new ClusterEvent(CLUSTER_NAME, ClusterEventType.TaskCurrentStateChange);
   private PropertyKey.Builder _keyBuilder;
   private String _testWorkflow = TestHelper.getTestClassName();
   private String _testJobPrefix = _testWorkflow + "_Job_";
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
index 0b43a90..aa36b4f 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
@@ -57,6 +57,11 @@ public class MockHelixTaskExecutor extends HelixTaskExecutor {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     PropertyKey path = keyBuilder.currentStates(manager.getInstanceName(), manager.getSessionId());
     Map<String, CurrentState> currentStateMap = accessor.getChildValuesMap(path, true);
+    // Also add the task path
+    PropertyKey taskPath =
+        keyBuilder.taskCurrentStates(manager.getInstanceName(), manager.getSessionId());
+    Map<String, CurrentState> taskCurrentStateMap = accessor.getChildValuesMap(taskPath, true);
+    taskCurrentStateMap.forEach(currentStateMap::putIfAbsent);
 
     Set<String> seenPartitions = new HashSet<>();
     for (Message message : messages) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 8785796..1560d74 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -386,7 +386,9 @@ public class PerInstanceAccessor extends AbstractHelixResource {
 
     List<String> resources =
         accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, currentSessionId));
-    if (resources != null && resources.size() > 0) {
+    resources.addAll(accessor
+        .getChildNames(accessor.keyBuilder().taskCurrentStates(instanceName, currentSessionId)));
+    if (resources.size() > 0) {
       resourcesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(resources));
     }
 
@@ -409,6 +411,10 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     String currentSessionId = sessionIds.get(0);
     CurrentState resourceCurrentState = accessor.getProperty(
         accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName));
+    if (resourceCurrentState == null) {
+      resourceCurrentState = accessor.getProperty(
+          accessor.keyBuilder().taskCurrentState(instanceName, currentSessionId, resourceName));
+    }
     if (resourceCurrentState != null) {
       return JSONRepresentation(resourceCurrentState.getRecord());
     }