You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/12/21 23:00:36 UTC
[helix] 02/03: Task Current State Migration: helix-rest, utils,
tests changes (#1579)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit db94b22c3b92b85f150680225b230c9df3882ba5
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Dec 14 14:00:56 2020 -0800
Task Current State Migration: helix-rest, utils, tests changes (#1579)
The first part of the task current state migration.
All changes made in this commit are on the controller side and are non-pipeline related.
---
.../tools/commandtools/CurrentStateCleanUp.java | 24 +++++++++++++++-------
.../src/test/java/org/apache/helix/TestHelper.java | 7 +++++++
.../helix/controller/stages/TestTaskStage.java | 3 ++-
.../messaging/handling/MockHelixTaskExecutor.java | 5 +++++
.../resources/helix/PerInstanceAccessor.java | 8 +++++++-
5 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
index c657490..31c3cc7 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
@@ -19,6 +19,7 @@ package org.apache.helix.tools.commandtools;
* under the License.
*/
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -124,18 +125,27 @@ public class CurrentStateCleanUp {
LOG.info(String.format("Processing cleaning current state for instance: %s", instanceName));
List<String> currentStateNames =
accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, session));
- for (String currentStateName : currentStateNames) {
- PropertyKey key =
- accessor.keyBuilder().currentState(instanceName, session, currentStateName);
+ List<String> taskCurrentStateNames =
+ accessor.getChildNames(accessor.keyBuilder().taskCurrentStates(instanceName, session));
+ List<PropertyKey> allCurrentStateKeys = new ArrayList<>();
+ currentStateNames.stream()
+ .map(name -> accessor.keyBuilder().currentState(instanceName, session, name))
+ .forEach(allCurrentStateKeys::add);
+ taskCurrentStateNames.stream()
+ .map(name -> accessor.keyBuilder().taskCurrentState(instanceName, session, name))
+ .forEach(allCurrentStateKeys::add);
+
+ List<String> pathsToRemove = new ArrayList<>();
+ for (PropertyKey key : allCurrentStateKeys) {
accessor.getBaseDataAccessor().update(key.getPath(), updater, AccessOption.PERSISTENT);
CurrentState currentState = accessor.getProperty(key);
if (currentState.getPartitionStateMap().size() == 0) {
- accessor.getBaseDataAccessor().remove(key.getPath(), AccessOption.PERSISTENT);
- LOG.info(String.format("Remove current state for instance: %s, resource %s", instanceName,
- currentStateName));
+ pathsToRemove.add(key.getPath());
+ LOG.info(String.format("Remove current state for path %s", key.getPath()));
}
-
}
+
+ accessor.getBaseDataAccessor().remove(pathsToRemove, AccessOption.PERSISTENT);
} catch (Exception e) {
e.printStackTrace();
} finally {
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index c805158..79f238d 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -240,6 +240,13 @@ public class TestHelper {
if (curState != null && curState.getRecord().getMapFields().size() != 0) {
return false;
}
+
+ CurrentState taskCurState =
+ accessor.getProperty(keyBuilder.taskCurrentState(instanceName, sessionId, resourceName));
+
+ if (taskCurState != null && taskCurState.getRecord().getMapFields().size() != 0) {
+ return false;
+ }
}
ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index 0810e09..b0d8da9 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -48,7 +48,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestTaskStage extends TaskTestBase {
- private ClusterEvent _event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange);
+ private ClusterEvent _event =
+ new ClusterEvent(CLUSTER_NAME, ClusterEventType.TaskCurrentStateChange);
private PropertyKey.Builder _keyBuilder;
private String _testWorkflow = TestHelper.getTestClassName();
private String _testJobPrefix = _testWorkflow + "_Job_";
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
index 0b43a90..aa36b4f 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java
@@ -57,6 +57,11 @@ public class MockHelixTaskExecutor extends HelixTaskExecutor {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
PropertyKey path = keyBuilder.currentStates(manager.getInstanceName(), manager.getSessionId());
Map<String, CurrentState> currentStateMap = accessor.getChildValuesMap(path, true);
+ // Also add the task path
+ PropertyKey taskPath =
+ keyBuilder.taskCurrentStates(manager.getInstanceName(), manager.getSessionId());
+ Map<String, CurrentState> taskCurrentStateMap = accessor.getChildValuesMap(taskPath, true);
+ taskCurrentStateMap.forEach(currentStateMap::putIfAbsent);
Set<String> seenPartitions = new HashSet<>();
for (Message message : messages) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 8785796..1560d74 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -386,7 +386,9 @@ public class PerInstanceAccessor extends AbstractHelixResource {
List<String> resources =
accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, currentSessionId));
- if (resources != null && resources.size() > 0) {
+ resources.addAll(accessor
+ .getChildNames(accessor.keyBuilder().taskCurrentStates(instanceName, currentSessionId)));
+ if (resources.size() > 0) {
resourcesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(resources));
}
@@ -409,6 +411,10 @@ public class PerInstanceAccessor extends AbstractHelixResource {
String currentSessionId = sessionIds.get(0);
CurrentState resourceCurrentState = accessor.getProperty(
accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName));
+ if (resourceCurrentState == null) {
+ resourceCurrentState = accessor.getProperty(
+ accessor.keyBuilder().taskCurrentState(instanceName, currentSessionId, resourceName));
+ }
if (resourceCurrentState != null) {
return JSONRepresentation(resourceCurrentState.getRecord());
}