You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/12/21 23:00:37 UTC
[helix] 03/03: Participant-side Task Current State Migration (#1584)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 361e18da4152c0146daa9d9dc7929f1f2bdcd9dc
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Tue Dec 15 14:26:53 2020 -0800
Participant-side Task Current State Migration (#1584)
The second part of the task current state migration.
All changes made in this commit are on the participant side.
---
.../java/org/apache/helix/SystemPropertyKeys.java | 3 +
.../handling/HelixStateTransitionHandler.java | 27 ++++--
.../messaging/handling/HelixTaskExecutor.java | 15 +++-
.../java/org/apache/helix/task/TaskRunner.java | 6 +-
.../TestRoutingTableProviderFromCurrentStates.java | 95 ++++++++++++++++++++
.../task/TestCurrentStateDropWithoutConfigs.java | 4 +-
.../task/TestDropCurrentStateRunningTask.java | 12 +--
.../integration/task/TestTaskCurrentStateDrop.java | 16 ++--
.../integration/task/TestTaskCurrentStateNull.java | 8 +-
.../task/TestTaskCurrentStatePathDisabled.java | 100 +++++++++++++++++++++
.../task/TestTaskSchedulingTwoCurrentStates.java | 8 +-
11 files changed, 259 insertions(+), 35 deletions(-)
diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 6a73a7e..4fe651c 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -84,4 +84,7 @@ public class SystemPropertyKeys {
MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY;
public static final String STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED = "helix.StateUpdateUtil.errorLog.enabled";
+
+ public static final String TASK_CURRENT_STATE_PATH_DISABLED =
+ "helix.taskCurrentStatePathDisabled";
}
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index bdb0d0b..d32e90e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -38,6 +38,7 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
@@ -45,6 +46,7 @@ import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.task.TaskStateModel;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
@@ -86,6 +88,8 @@ public class HelixStateTransitionHandler extends MessageHandler {
private final CurrentState _currentStateDelta;
private final HelixManager _manager;
private final StateModelFactory<? extends StateModel> _stateModelFactory;
+ private final boolean _isTaskMessage;
+ private final boolean _isTaskCurrentStatePathDisabled;
volatile boolean _isTimeout = false;
public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory,
@@ -98,6 +102,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
_currentStateDelta = currentStateDelta;
_manager = _notificationContext.getManager();
_stateModelFactory = stateModelFactory;
+ _isTaskMessage = stateModel instanceof TaskStateModel;
+ _isTaskCurrentStatePathDisabled =
+ Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED);
}
void preHandleMessage() throws Exception {
@@ -137,8 +144,10 @@ public class HelixStateTransitionHandler extends MessageHandler {
String sessionId = _message.getTgtSessionId();
String resource = _message.getResourceName();
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize());
- PropertyKey key = accessor.keyBuilder().currentState(instance, sessionId, resource,
- bucketizer.getBucketName(partitionName));
+ PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? accessor.keyBuilder()
+ .taskCurrentState(instance, sessionId, resource, bucketizer.getBucketName(partitionName))
+ : accessor.keyBuilder()
+ .currentState(instance, sessionId, resource, bucketizer.getBucketName(partitionName));
ZNRecord rec = new ZNRecord(resource);
Map<String, String> map = new TreeMap<String, String>();
map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
@@ -264,8 +273,10 @@ public class HelixStateTransitionHandler extends MessageHandler {
try {
// Update the ZK current state of the node
- PropertyKey key = keyBuilder.currentState(instanceName, sessionId, resource,
- bucketizer.getBucketName(partitionKey));
+ PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? accessor.keyBuilder()
+ .taskCurrentState(instanceName, sessionId, resource,
+ bucketizer.getBucketName(partitionKey)) : accessor.keyBuilder()
+ .currentState(instanceName, sessionId, resource, bucketizer.getBucketName(partitionKey));
if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
// normal message
if (!accessor.updateProperty(key, _currentStateDelta)) {
@@ -438,9 +449,11 @@ public class HelixStateTransitionHandler extends MessageHandler {
disablePartition();
}
- if (!accessor.updateProperty(
- keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName),
- currentStateDelta)) {
+ PropertyKey currentStateKey =
+ _isTaskMessage && !_isTaskCurrentStatePathDisabled ? keyBuilder
+ .taskCurrentState(instanceName, _message.getTgtSessionId(), resourceName)
+ : keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName);
+ if (!accessor.updateProperty(currentStateKey, currentStateDelta)) {
logger.error("Fails to persist ERROR current state to ZK for resource " + resourceName
+ " partition: " + partition);
}
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 54566cc..71ef39f 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -52,6 +52,7 @@ import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.controller.GenericHelixController;
@@ -70,6 +71,7 @@ import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -830,6 +832,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
String sessionId = manager.getSessionId();
List<String> curResourceNames =
accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+ List<String> taskCurResourceNames =
+ accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId));
List<PropertyKey> createCurStateKeys = new ArrayList<>();
List<CurrentState> metaCurStates = new ArrayList<>();
Set<String> createCurStateNames = new HashSet<>();
@@ -908,10 +912,15 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
if (!message.isControlerMsg() && message.getMsgType()
.equals(Message.MessageType.STATE_TRANSITION.name())) {
String resourceName = message.getResourceName();
- if (!curResourceNames.contains(resourceName) && !createCurStateNames
- .contains(resourceName)) {
+ if (!curResourceNames.contains(resourceName) && !taskCurResourceNames.contains(resourceName)
+ && !createCurStateNames.contains(resourceName)) {
createCurStateNames.add(resourceName);
- createCurStateKeys.add(keyBuilder.currentState(instanceName, sessionId, resourceName));
+ PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName);
+ if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean
+ .getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
+ curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName);
+ }
+ createCurStateKeys.add(curStateKey);
CurrentState metaCurState = new CurrentState(resourceName);
metaCurState.setBucketSize(message.getBucketSize());
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index addadc8..68017c3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -22,6 +22,7 @@ package org.apache.helix.task;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.model.CurrentState;
import org.apache.helix.task.TaskResult.Status;
import org.slf4j.Logger;
@@ -208,7 +209,10 @@ public class TaskRunner implements Runnable {
String.format("Requesting a state transition to %s for partition %s.", state, partition));
try {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
+ PropertyKey key =
+ Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED) ? keyBuilder
+ .currentState(instance, sessionId, resource)
+ : keyBuilder.taskCurrentState(instance, sessionId, resource);
CurrentState currStateDelta = new CurrentState(resource);
currStateDelta.setRequestedState(partition, state.name());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
index 5503067..e8f4f82 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -20,14 +20,17 @@ package org.apache.helix.integration.spectator;
*/
import java.lang.management.ManagementFactory;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import com.google.common.collect.ImmutableMap;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
@@ -35,12 +38,14 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
@@ -49,7 +54,16 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
+import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
@@ -77,9 +91,15 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+ taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
+
for (int i = 0; i < NUM_NODES; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+ new TaskStateModelFactory(_participants[i], taskFactoryReg));
_participants[i].syncStart();
}
@@ -117,6 +137,61 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
}
@Test
+ public void testCurrentStatesRoutingTableIgnoreTaskCurrentStates() throws Exception {
+ FlaggedCurrentStateRoutingTableProvider routingTableCurrentStates =
+ new FlaggedCurrentStateRoutingTableProvider(_manager);
+ Assert.assertFalse(routingTableCurrentStates.isOnStateChangeTriggered());
+
+ try {
+ TaskDriver taskDriver = new TaskDriver(_manager);
+ String workflowName1 = TestHelper.getTestMethodName() + "_1";
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setWorkflow(workflowName1).setNumberOfTasks(NUM_NODES)
+ .setNumConcurrentTasksPerInstance(1).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder);
+ taskDriver.start(workflowBuilder1.build());
+ taskDriver
+ .pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+ TaskState.COMPLETED);
+
+ Assert.assertFalse(routingTableCurrentStates.isOnStateChangeTriggered());
+
+ // Disable the task current path and the routing table provider should be notified
+ System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "true");
+ String workflowName2 = TestHelper.getTestMethodName() + "_2";
+ Workflow.Builder workflowBuilder2 =
+ new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder);
+ taskDriver.start(workflowBuilder2.build());
+ taskDriver
+ .pollForJobState(workflowName2, TaskUtil.getNamespacedJobName(workflowName2, jobName),
+ TaskState.COMPLETED);
+
+ Assert.assertTrue(routingTableCurrentStates.isOnStateChangeTriggered());
+ System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "false");
+
+ String dbName = "testDB";
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, dbName, NUM_PARTITIONS, "MasterSlave",
+ IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName());
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, NUM_REPLICAS);
+
+ ZkHelixClusterVerifier clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
+ Assert.assertTrue(clusterVerifier.verifyByPolling());
+ Assert.assertTrue(routingTableCurrentStates.isOnStateChangeTriggered());
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, dbName);
+ } finally {
+ routingTableCurrentStates.shutdown();
+ }
+ }
+
+ @Test (dependsOnMethods = "testCurrentStatesRoutingTableIgnoreTaskCurrentStates")
public void testRoutingTableWithCurrentStates() throws Exception {
RoutingTableProvider routingTableEV =
new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
@@ -365,4 +440,24 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
super.onLiveInstanceChange(liveInstances, changeContext);
}
}
+
+ static class FlaggedCurrentStateRoutingTableProvider extends RoutingTableProvider {
+ private boolean onStateChangeTriggered = false;
+
+ public FlaggedCurrentStateRoutingTableProvider(HelixManager manager) {
+ super(manager, PropertyType.CURRENTSTATES);
+ }
+
+ public boolean isOnStateChangeTriggered() {
+ return onStateChangeTriggered;
+ }
+
+ @Override
+ @PreFetch(enabled = false)
+ public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+ NotificationContext changeContext) {
+ onStateChangeTriggered = true;
+ super.onStateChange(instanceName, statesInfo, changeContext);
+ }
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java
index 8a19904..346dd7a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java
@@ -52,11 +52,11 @@ public class TestCurrentStateDropWithoutConfigs extends TaskTestBase {
currentState.setStartTime(taskName, System.currentTimeMillis());
currentState.setEndTime(taskName, System.currentTimeMillis());
_accessor.setProperty(_accessor.keyBuilder()
- .currentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(),
+ .taskCurrentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(),
jobName), currentState);
Assert.assertTrue(TestHelper.verify(() -> _accessor.getProperty(_accessor.keyBuilder()
- .currentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(),
+ .taskCurrentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(),
jobName)) == null, TestHelper.WAIT_DURATION * 10));
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
index c932634..f8f0cd1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
@@ -98,8 +98,8 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase {
String instanceP2 = PARTICIPANT_PREFIX + "_" + (_startPort + 2);
ZkClient clientP2 = (ZkClient) _participants[2].getZkClient();
String sessionIdP2 = ZkTestHelper.getSessionId(clientP2);
- String currentStatePathP2 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP2 + "/CURRENTSTATES/"
- + sessionIdP2 + "/" + namespacedJobName;
+ String currentStatePathP2 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP2, sessionIdP2, namespacedJobName).toString();
Assert
.assertTrue(
@@ -113,14 +113,14 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase {
String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
- String currentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/"
- + sessionIdP0 + "/" + namespacedJobName;
+ String currentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString();
String instanceP1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1);
ZkClient clientP1 = (ZkClient) _participants[1].getZkClient();
String sessionIdP1 = ZkTestHelper.getSessionId(clientP1);
- String currentStatePathP1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP1 + "/CURRENTSTATES/"
- + sessionIdP1 + "/" + namespacedJobName;
+ String currentStatePathP1 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP1, sessionIdP1, namespacedJobName).toString();
ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(currentStatePathP2,
new Stat(), AccessOption.PERSISTENT);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
index 627a7b9..8ca89e1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
@@ -88,10 +88,10 @@ public class TestTaskCurrentStateDrop extends TaskTestBase {
String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
- String taskCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
- + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName;
- String dataBaseCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
- + "/CURRENTSTATES/" + sessionIdP0 + "/" + DATABASE;
+ String taskCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString();
+ String dataBaseCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+ .currentState(instanceP0, sessionIdP0, DATABASE).toString();
// Read the current states of Participant0 and make sure they been created
boolean isCurrentStateCreated = TestHelper.verify(() -> {
@@ -113,10 +113,10 @@ public class TestTaskCurrentStateDrop extends TaskTestBase {
clientP0 = (ZkClient) _participants[0].getZkClient();
String newSessionIdP0 = ZkTestHelper.getSessionId(clientP0);
- String newTaskCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
- + "/CURRENTSTATES/" + newSessionIdP0 + "/" + namespacedJobName;
- String newDataBaseCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
- + "/CURRENTSTATES/" + newSessionIdP0 + "/" + DATABASE;
+ String newTaskCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP0, newSessionIdP0, namespacedJobName).toString();
+ String newDataBaseCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+ .currentState(instanceP0, newSessionIdP0, DATABASE).toString();
boolean isCurrentStateExpected = TestHelper.verify(() -> {
ZNRecord taskRecord = _manager.getHelixDataAccessor().getBaseDataAccessor()
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java
index 720e7a3..1efb01e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java
@@ -88,10 +88,10 @@ public class TestTaskCurrentStateNull extends TaskTestBase {
String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
- String jobCurrentStatePath1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
- + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName1;
- String jobCurrentStatePath2 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
- + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName2;
+ String jobCurrentStatePath1 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName1).toString();
+ String jobCurrentStatePath2 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName2).toString();
// Read the current states of Participant0 and make sure they have been created
boolean isCurrentStateCreated = TestHelper.verify(() -> {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java
new file mode 100644
index 0000000..8882a09
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java
@@ -0,0 +1,100 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * This test makes sure that the Current State of the task are being removed after participant
+ * handles new session.
+ */
+public class TestTaskCurrentStatePathDisabled extends TaskTestBase {
+ private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+ protected HelixDataAccessor _accessor;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _numPartitions = 1;
+ _numNodes = 1;
+ super.beforeClass();
+ }
+
+ @Test
+ public void testTaskCurrentStatePathDisabled() throws Exception {
+ String jobQueueName0 = TestHelper.getTestMethodName() + "_0";
+ JobConfig.Builder jobBuilder0 =
+ new JobConfig.Builder().setWorkflow(jobQueueName0).setTargetResource(DATABASE)
+ .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+ .setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+ JobQueue.Builder jobQueue0 = TaskTestUtil.buildJobQueue(jobQueueName0);
+ jobQueue0.enqueueJob("JOB0", jobBuilder0);
+
+ _driver.start(jobQueue0.build());
+ String namespacedJobName0 = TaskUtil.getNamespacedJobName(jobQueueName0, "JOB0");
+ _driver.pollForJobState(jobQueueName0, namespacedJobName0, TaskState.IN_PROGRESS);
+
+ // Get the current states of Participant0
+ String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+ ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
+ String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+ PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+ Assert.assertNotNull(_manager.getHelixDataAccessor()
+ .getProperty(keyBuilder.taskCurrentState(instanceP0, sessionIdP0, namespacedJobName0)));
+ Assert.assertNull(_manager.getHelixDataAccessor()
+ .getProperty(keyBuilder.currentState(instanceP0, sessionIdP0, namespacedJobName0)));
+
+ // Test the case when the task current state path is disabled
+ String jobQueueName1 = TestHelper.getTestMethodName() + "_1";
+ JobConfig.Builder jobBuilder1 =
+ new JobConfig.Builder().setWorkflow(jobQueueName1).setTargetResource(DATABASE)
+ .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+ .setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+ JobQueue.Builder jobQueue1 = TaskTestUtil.buildJobQueue(jobQueueName1);
+ jobQueue1.enqueueJob("JOB1", jobBuilder1);
+
+ System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "true");
+ _driver.start(jobQueue1.build());
+ String namespacedJobName1 = TaskUtil.getNamespacedJobName(jobQueueName1, "JOB1");
+ _driver.pollForJobState(jobQueueName1, namespacedJobName1, TaskState.IN_PROGRESS);
+ Assert.assertNull(_manager.getHelixDataAccessor()
+ .getProperty(keyBuilder.taskCurrentState(instanceP0, sessionIdP0, namespacedJobName1)));
+ Assert.assertNotNull(_manager.getHelixDataAccessor()
+ .getProperty(keyBuilder.currentState(instanceP0, sessionIdP0, namespacedJobName1)));
+ System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "false");
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
index bb970c7..ada5157 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
@@ -158,15 +158,15 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
- String currentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/"
- + sessionIdP0 + "/" + namespacedJobName;
+ String currentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString();
// Get the current state of Participant1
String instanceP1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1);
ZkClient clientP1 = (ZkClient) _participants[1].getZkClient();
String sessionIdP1 = ZkTestHelper.getSessionId(clientP1);
- String currentStatePathP1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP1 + "/CURRENTSTATES/"
- + sessionIdP1 + "/" + namespacedJobName;
+ String currentStatePathP1 = _manager.getHelixDataAccessor().keyBuilder()
+ .taskCurrentState(instanceP1, sessionIdP1, namespacedJobName).toString();
boolean isCurrentStateCreated = TestHelper.verify(() -> {
ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor()