You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/12/21 23:00:37 UTC

[helix] 03/03: Participant-side Task Current State Migration (#1584)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 361e18da4152c0146daa9d9dc7929f1f2bdcd9dc
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Tue Dec 15 14:26:53 2020 -0800

    Participant-side Task Current State Migration (#1584)
    
    The second part of the task current state migration.
    All changes made in this commit are on the participant side.
---
 .../java/org/apache/helix/SystemPropertyKeys.java  |   3 +
 .../handling/HelixStateTransitionHandler.java      |  27 ++++--
 .../messaging/handling/HelixTaskExecutor.java      |  15 +++-
 .../java/org/apache/helix/task/TaskRunner.java     |   6 +-
 .../TestRoutingTableProviderFromCurrentStates.java |  95 ++++++++++++++++++++
 .../task/TestCurrentStateDropWithoutConfigs.java   |   4 +-
 .../task/TestDropCurrentStateRunningTask.java      |  12 +--
 .../integration/task/TestTaskCurrentStateDrop.java |  16 ++--
 .../integration/task/TestTaskCurrentStateNull.java |   8 +-
 .../task/TestTaskCurrentStatePathDisabled.java     | 100 +++++++++++++++++++++
 .../task/TestTaskSchedulingTwoCurrentStates.java   |   8 +-
 11 files changed, 259 insertions(+), 35 deletions(-)

diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 6a73a7e..4fe651c 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -84,4 +84,7 @@ public class SystemPropertyKeys {
       MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY;
 
   public static final String STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED = "helix.StateUpdateUtil.errorLog.enabled";
+
+  public static final String TASK_CURRENT_STATE_PATH_DISABLED =
+      "helix.taskCurrentStatePathDisabled";
 }
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index bdb0d0b..d32e90e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -38,6 +38,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
@@ -45,6 +46,7 @@ import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.task.TaskStateModel;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
@@ -86,6 +88,8 @@ public class HelixStateTransitionHandler extends MessageHandler {
   private final CurrentState _currentStateDelta;
   private final HelixManager _manager;
   private final StateModelFactory<? extends StateModel> _stateModelFactory;
+  private final boolean _isTaskMessage;
+  private final boolean _isTaskCurrentStatePathDisabled;
   volatile boolean _isTimeout = false;
 
   public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory,
@@ -98,6 +102,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
     _currentStateDelta = currentStateDelta;
     _manager = _notificationContext.getManager();
     _stateModelFactory = stateModelFactory;
+    _isTaskMessage = stateModel instanceof TaskStateModel;
+    _isTaskCurrentStatePathDisabled =
+        Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED);
   }
 
   void preHandleMessage() throws Exception {
@@ -137,8 +144,10 @@ public class HelixStateTransitionHandler extends MessageHandler {
       String sessionId = _message.getTgtSessionId();
       String resource = _message.getResourceName();
       ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize());
-      PropertyKey key = accessor.keyBuilder().currentState(instance, sessionId, resource,
-          bucketizer.getBucketName(partitionName));
+      PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? accessor.keyBuilder()
+          .taskCurrentState(instance, sessionId, resource, bucketizer.getBucketName(partitionName))
+          : accessor.keyBuilder()
+              .currentState(instance, sessionId, resource, bucketizer.getBucketName(partitionName));
       ZNRecord rec = new ZNRecord(resource);
       Map<String, String> map = new TreeMap<String, String>();
       map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
@@ -264,8 +273,10 @@ public class HelixStateTransitionHandler extends MessageHandler {
 
     try {
       // Update the ZK current state of the node
-      PropertyKey key = keyBuilder.currentState(instanceName, sessionId, resource,
-          bucketizer.getBucketName(partitionKey));
+      PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? accessor.keyBuilder()
+          .taskCurrentState(instanceName, sessionId, resource,
+              bucketizer.getBucketName(partitionKey)) : accessor.keyBuilder()
+          .currentState(instanceName, sessionId, resource, bucketizer.getBucketName(partitionKey));
       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
         // normal message
         if (!accessor.updateProperty(key, _currentStateDelta)) {
@@ -438,9 +449,11 @@ public class HelixStateTransitionHandler extends MessageHandler {
           disablePartition();
         }
 
-        if (!accessor.updateProperty(
-            keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName),
-            currentStateDelta)) {
+        PropertyKey currentStateKey =
+            _isTaskMessage && !_isTaskCurrentStatePathDisabled ? keyBuilder
+                .taskCurrentState(instanceName, _message.getTgtSessionId(), resourceName)
+                : keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName);
+        if (!accessor.updateProperty(currentStateKey, currentStateDelta)) {
           logger.error("Fails to persist ERROR current state to ZK for resource " + resourceName
               + " partition: " + partition);
         }
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 54566cc..71ef39f 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -52,6 +52,7 @@ import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.controller.GenericHelixController;
@@ -70,6 +71,7 @@ import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -830,6 +832,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     String sessionId = manager.getSessionId();
     List<String> curResourceNames =
         accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+    List<String> taskCurResourceNames =
+        accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId));
     List<PropertyKey> createCurStateKeys = new ArrayList<>();
     List<CurrentState> metaCurStates = new ArrayList<>();
     Set<String> createCurStateNames = new HashSet<>();
@@ -908,10 +912,15 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       if (!message.isControlerMsg() && message.getMsgType()
           .equals(Message.MessageType.STATE_TRANSITION.name())) {
         String resourceName = message.getResourceName();
-        if (!curResourceNames.contains(resourceName) && !createCurStateNames
-            .contains(resourceName)) {
+        if (!curResourceNames.contains(resourceName) && !taskCurResourceNames.contains(resourceName)
+            && !createCurStateNames.contains(resourceName)) {
           createCurStateNames.add(resourceName);
-          createCurStateKeys.add(keyBuilder.currentState(instanceName, sessionId, resourceName));
+          PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName);
+          if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean
+              .getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
+            curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName);
+          }
+          createCurStateKeys.add(curStateKey);
 
           CurrentState metaCurState = new CurrentState(resourceName);
           metaCurState.setBucketSize(message.getBucketSize());
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index addadc8..68017c3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -22,6 +22,7 @@ package org.apache.helix.task;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.task.TaskResult.Status;
 import org.slf4j.Logger;
@@ -208,7 +209,10 @@ public class TaskRunner implements Runnable {
         String.format("Requesting a state transition to %s for partition %s.", state, partition));
     try {
       PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-      PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
+      PropertyKey key =
+          Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED) ? keyBuilder
+              .currentState(instance, sessionId, resource)
+              : keyBuilder.taskCurrentState(instance, sessionId, resource);
       CurrentState currStateDelta = new CurrentState(resource);
       currStateDelta.setRequestedState(partition, state.name());
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
index 5503067..e8f4f82 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -20,14 +20,17 @@ package org.apache.helix.integration.spectator;
  */
 
 import java.lang.management.ManagementFactory;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -35,12 +38,14 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
@@ -49,7 +54,16 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
+import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -77,9 +91,15 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
       _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
 
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
+
     for (int i = 0; i < NUM_NODES; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
       _participants[i].syncStart();
     }
 
@@ -117,6 +137,61 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
   }
 
   @Test
+  public void testCurrentStatesRoutingTableIgnoreTaskCurrentStates() throws Exception {
+    FlaggedCurrentStateRoutingTableProvider routingTableCurrentStates =
+        new FlaggedCurrentStateRoutingTableProvider(_manager);
+    Assert.assertFalse(routingTableCurrentStates.isOnStateChangeTriggered());
+
+    try {
+      TaskDriver taskDriver = new TaskDriver(_manager);
+      String workflowName1 = TestHelper.getTestMethodName() + "_1";
+      String jobName = "JOB0";
+
+      JobConfig.Builder jobBuilder =
+          new JobConfig.Builder().setWorkflow(workflowName1).setNumberOfTasks(NUM_NODES)
+              .setNumConcurrentTasksPerInstance(1).setCommand(MockTask.TASK_COMMAND)
+              .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+      Workflow.Builder workflowBuilder1 =
+          new Workflow.Builder(workflowName1).addJob(jobName, jobBuilder);
+      taskDriver.start(workflowBuilder1.build());
+      taskDriver
+          .pollForJobState(workflowName1, TaskUtil.getNamespacedJobName(workflowName1, jobName),
+              TaskState.COMPLETED);
+
+      Assert.assertFalse(routingTableCurrentStates.isOnStateChangeTriggered());
+
+      // Disable the task current path and the routing table provider should be notified
+      System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "true");
+      String workflowName2 = TestHelper.getTestMethodName() + "_2";
+      Workflow.Builder workflowBuilder2 =
+          new Workflow.Builder(workflowName2).addJob(jobName, jobBuilder);
+      taskDriver.start(workflowBuilder2.build());
+      taskDriver
+          .pollForJobState(workflowName2, TaskUtil.getNamespacedJobName(workflowName2, jobName),
+              TaskState.COMPLETED);
+
+      Assert.assertTrue(routingTableCurrentStates.isOnStateChangeTriggered());
+      System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "false");
+
+      String dbName = "testDB";
+      _gSetupTool.addResourceToCluster(CLUSTER_NAME, dbName, NUM_PARTITIONS, "MasterSlave",
+          IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName());
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, NUM_REPLICAS);
+
+      ZkHelixClusterVerifier clusterVerifier =
+          new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
+              .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+              .build();
+      Assert.assertTrue(clusterVerifier.verifyByPolling());
+      Assert.assertTrue(routingTableCurrentStates.isOnStateChangeTriggered());
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, dbName);
+    } finally {
+      routingTableCurrentStates.shutdown();
+    }
+  }
+
+  @Test (dependsOnMethods = "testCurrentStatesRoutingTableIgnoreTaskCurrentStates")
   public void testRoutingTableWithCurrentStates() throws Exception {
     RoutingTableProvider routingTableEV =
         new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
@@ -365,4 +440,24 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
       super.onLiveInstanceChange(liveInstances, changeContext);
     }
   }
+
+  static class FlaggedCurrentStateRoutingTableProvider extends RoutingTableProvider {
+    private boolean onStateChangeTriggered = false;
+
+    public FlaggedCurrentStateRoutingTableProvider(HelixManager manager) {
+      super(manager, PropertyType.CURRENTSTATES);
+    }
+
+    public boolean isOnStateChangeTriggered() {
+      return onStateChangeTriggered;
+    }
+
+    @Override
+    @PreFetch(enabled = false)
+    public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+        NotificationContext changeContext) {
+      onStateChangeTriggered = true;
+      super.onStateChange(instanceName, statesInfo, changeContext);
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java
index 8a19904..346dd7a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestCurrentStateDropWithoutConfigs.java
@@ -52,11 +52,11 @@ public class TestCurrentStateDropWithoutConfigs extends TaskTestBase {
     currentState.setStartTime(taskName, System.currentTimeMillis());
     currentState.setEndTime(taskName, System.currentTimeMillis());
     _accessor.setProperty(_accessor.keyBuilder()
-        .currentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(),
+        .taskCurrentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(),
             jobName), currentState);
 
     Assert.assertTrue(TestHelper.verify(() -> _accessor.getProperty(_accessor.keyBuilder()
-        .currentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(),
+        .taskCurrentState(_participants[0].getInstanceName(), liveInstance.getEphemeralOwner(),
             jobName)) == null, TestHelper.WAIT_DURATION * 10));
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
index c932634..f8f0cd1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
@@ -98,8 +98,8 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase {
     String instanceP2 = PARTICIPANT_PREFIX + "_" + (_startPort + 2);
     ZkClient clientP2 = (ZkClient) _participants[2].getZkClient();
     String sessionIdP2 = ZkTestHelper.getSessionId(clientP2);
-    String currentStatePathP2 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP2 + "/CURRENTSTATES/"
-        + sessionIdP2 + "/" + namespacedJobName;
+    String currentStatePathP2 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP2, sessionIdP2, namespacedJobName).toString();
 
     Assert
         .assertTrue(
@@ -113,14 +113,14 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase {
     String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
     ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
     String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
-    String currentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/"
-        + sessionIdP0 + "/" + namespacedJobName;
+    String currentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString();
 
     String instanceP1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1);
     ZkClient clientP1 = (ZkClient) _participants[1].getZkClient();
     String sessionIdP1 = ZkTestHelper.getSessionId(clientP1);
-    String currentStatePathP1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP1 + "/CURRENTSTATES/"
-        + sessionIdP1 + "/" + namespacedJobName;
+    String currentStatePathP1 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP1, sessionIdP1, namespacedJobName).toString();
 
     ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(currentStatePathP2,
         new Stat(), AccessOption.PERSISTENT);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
index 627a7b9..8ca89e1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateDrop.java
@@ -88,10 +88,10 @@ public class TestTaskCurrentStateDrop extends TaskTestBase {
     String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
     ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
     String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
-    String taskCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
-        + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName;
-    String dataBaseCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
-        + "/CURRENTSTATES/" + sessionIdP0 + "/" + DATABASE;
+    String taskCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString();
+    String dataBaseCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+        .currentState(instanceP0, sessionIdP0, DATABASE).toString();
 
     // Read the current states of Participant0 and make sure they been created
     boolean isCurrentStateCreated = TestHelper.verify(() -> {
@@ -113,10 +113,10 @@ public class TestTaskCurrentStateDrop extends TaskTestBase {
 
     clientP0 = (ZkClient) _participants[0].getZkClient();
     String newSessionIdP0 = ZkTestHelper.getSessionId(clientP0);
-    String newTaskCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
-        + "/CURRENTSTATES/" + newSessionIdP0 + "/" + namespacedJobName;
-    String newDataBaseCurrentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
-        + "/CURRENTSTATES/" + newSessionIdP0 + "/" + DATABASE;
+    String newTaskCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP0, newSessionIdP0, namespacedJobName).toString();
+    String newDataBaseCurrentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+        .currentState(instanceP0, newSessionIdP0, DATABASE).toString();
 
     boolean isCurrentStateExpected = TestHelper.verify(() -> {
       ZNRecord taskRecord = _manager.getHelixDataAccessor().getBaseDataAccessor()
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java
index 720e7a3..1efb01e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java
@@ -88,10 +88,10 @@ public class TestTaskCurrentStateNull extends TaskTestBase {
     String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
     ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
     String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
-    String jobCurrentStatePath1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
-        + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName1;
-    String jobCurrentStatePath2 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
-        + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName2;
+    String jobCurrentStatePath1 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName1).toString();
+    String jobCurrentStatePath2 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName2).toString();
 
     // Read the current states of Participant0 and make sure they have been created
     boolean isCurrentStateCreated = TestHelper.verify(() -> {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java
new file mode 100644
index 0000000..8882a09
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStatePathDisabled.java
@@ -0,0 +1,100 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * This test makes sure that the Current State of the task are being removed after participant
+ * handles new session.
+ */
+public class TestTaskCurrentStatePathDisabled extends TaskTestBase {
+  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testTaskCurrentStatePathDisabled() throws Exception {
+    String jobQueueName0 = TestHelper.getTestMethodName() + "_0";
+    JobConfig.Builder jobBuilder0 =
+        new JobConfig.Builder().setWorkflow(jobQueueName0).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+    JobQueue.Builder jobQueue0 = TaskTestUtil.buildJobQueue(jobQueueName0);
+    jobQueue0.enqueueJob("JOB0", jobBuilder0);
+
+    _driver.start(jobQueue0.build());
+    String namespacedJobName0 = TaskUtil.getNamespacedJobName(jobQueueName0, "JOB0");
+    _driver.pollForJobState(jobQueueName0, namespacedJobName0, TaskState.IN_PROGRESS);
+
+    // Get the current states of Participant0
+    String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+    ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
+    String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+    PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+    Assert.assertNotNull(_manager.getHelixDataAccessor()
+        .getProperty(keyBuilder.taskCurrentState(instanceP0, sessionIdP0, namespacedJobName0)));
+    Assert.assertNull(_manager.getHelixDataAccessor()
+        .getProperty(keyBuilder.currentState(instanceP0, sessionIdP0, namespacedJobName0)));
+
+    // Test the case when the task current state path is disabled
+    String jobQueueName1 = TestHelper.getTestMethodName() + "_1";
+    JobConfig.Builder jobBuilder1 =
+        new JobConfig.Builder().setWorkflow(jobQueueName1).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+    JobQueue.Builder jobQueue1 = TaskTestUtil.buildJobQueue(jobQueueName1);
+    jobQueue1.enqueueJob("JOB1", jobBuilder1);
+
+    System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "true");
+    _driver.start(jobQueue1.build());
+    String namespacedJobName1 = TaskUtil.getNamespacedJobName(jobQueueName1, "JOB1");
+    _driver.pollForJobState(jobQueueName1, namespacedJobName1, TaskState.IN_PROGRESS);
+    Assert.assertNull(_manager.getHelixDataAccessor()
+        .getProperty(keyBuilder.taskCurrentState(instanceP0, sessionIdP0, namespacedJobName1)));
+    Assert.assertNotNull(_manager.getHelixDataAccessor()
+        .getProperty(keyBuilder.currentState(instanceP0, sessionIdP0, namespacedJobName1)));
+    System.setProperty(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED, "false");
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
index bb970c7..ada5157 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
@@ -158,15 +158,15 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
     String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
     ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
     String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
-    String currentStatePathP0 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/"
-        + sessionIdP0 + "/" + namespacedJobName;
+    String currentStatePathP0 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP0, sessionIdP0, namespacedJobName).toString();
 
     // Get the current state of Participant1
     String instanceP1 = PARTICIPANT_PREFIX + "_" + (_startPort + 1);
     ZkClient clientP1 = (ZkClient) _participants[1].getZkClient();
     String sessionIdP1 = ZkTestHelper.getSessionId(clientP1);
-    String currentStatePathP1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP1 + "/CURRENTSTATES/"
-        + sessionIdP1 + "/" + namespacedJobName;
+    String currentStatePathP1 = _manager.getHelixDataAccessor().keyBuilder()
+        .taskCurrentState(instanceP1, sessionIdP1, namespacedJobName).toString();
 
     boolean isCurrentStateCreated = TestHelper.verify(() -> {
       ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor()