You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2021/07/16 21:03:16 UTC

[helix] 09/13: Implement Participant Freeze Process (#1812)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0aca85a6b867594900b801a173c2159f437d2f7a
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Wed Jul 14 12:22:19 2021 -0700

    Implement Participant Freeze Process (#1812)
    
    After controller sends a freeze message to participant, participant needs to process the freeze message and execute the freeze request. It will mark the live instance as "frozen" internally in memory and in live instance znode.
    
    For session change during freeze, handle new session won't do anything, except creating the live instance znode.
    To unfreeze, if there is session change during the freeze period, state model state will be synced and current states will be carried over. Then live instance status field is removed.
---
 .../helix/manager/zk/CurStateCarryOverUpdater.java |  19 +-
 .../helix/manager/zk/ParticipantManager.java       |  79 ++++--
 .../apache/helix/manager/zk/ZKHelixManager.java    |  47 +++-
 .../messaging/handling/HelixTaskExecutor.java      | 115 ++++++--
 .../messaging/handling/MessageHandlerFactory.java  |  10 +
 .../helix/participant/HelixStateMachineEngine.java |  27 +-
 .../helix/participant/statemachine/StateModel.java |   7 +
 .../paticipant/TestParticipantFreeze.java          | 291 +++++++++++++++++++++
 8 files changed, 541 insertions(+), 54 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
index a00c337..f62aa65 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
@@ -19,10 +19,10 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import java.util.Map;
+
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.task.TaskConstants;
-import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 
 
@@ -35,16 +35,17 @@ import org.apache.helix.zookeeper.zkclient.DataUpdater;
  */
 class CurStateCarryOverUpdater implements DataUpdater<ZNRecord> {
   final String _curSessionId;
-  final String _initState;
+  final Map<String, String> _expectedStateMap;
   final CurrentState _lastCurState;
 
-  public CurStateCarryOverUpdater(String curSessionId, String initState, CurrentState lastCurState) {
-    if (curSessionId == null || initState == null || lastCurState == null) {
+  public CurStateCarryOverUpdater(String curSessionId, Map<String, String> expectedStateMap,
+      CurrentState lastCurState) {
+    if (curSessionId == null || lastCurState == null) {
       throw new IllegalArgumentException(
-          "missing curSessionId|initState|lastCurState for carry-over");
+          "missing curSessionId|lastCurState for carry-over");
     }
     _curSessionId = curSessionId;
-    _initState = initState;
+    _expectedStateMap = expectedStateMap;
     _lastCurState = lastCurState;
   }
 
@@ -63,7 +64,7 @@ class CurStateCarryOverUpdater implements DataUpdater<ZNRecord> {
     for (String partitionName : _lastCurState.getPartitionStateMap().keySet()) {
         // carry-over only when current-state does not exist for regular Helix resource partitions
         if (curState.getState(partitionName) == null) {
-          curState.setState(partitionName, _initState);
+          curState.setState(partitionName, _expectedStateMap.get(partitionName));
         }
     }
     return curState.getRecord();
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 6e071f6..b7a0014 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixCloudProperty;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperty;
@@ -52,12 +54,15 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
+import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
@@ -153,7 +158,10 @@ public class ParticipantManager {
     // Live instance creation also checks if the expected session is valid or not. Live instance
     // should not be created by an expired zk session.
     createLiveInstance();
-    carryOverPreviousCurrentState();
+    if (shouldCarryOver()) {
+      carryOverPreviousCurrentState(_dataAccessor, _instanceName, _sessionId,
+          _manager.getStateMachineEngine(), true);
+    }
     removePreviousTaskCurrentStates();
 
     /**
@@ -162,6 +170,17 @@ public class ParticipantManager {
     setupMsgHandler();
   }
 
+  private boolean shouldCarryOver() {
+    if (_liveInstanceInfoProvider == null
+        || _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo() == null) {
+      return true;
+    }
+    String status = _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo()
+        .getSimpleField(LiveInstance.LiveInstanceProperty.STATUS.name());
+    // If frozen, no carry-over
+    return !LiveInstance.LiveInstanceStatus.PAUSED.name().equals(status);
+  }
+
   private void joinCluster() {
     // Read cluster config and see if an instance can auto join or auto register to the cluster
     boolean autoJoin = false;
@@ -339,21 +358,24 @@ public class ParticipantManager {
    * carry over current-states from last sessions
    * set to initial state for current session only when state doesn't exist in current session
    */
-  private void carryOverPreviousCurrentState() {
-    List<String> sessions = _dataAccessor.getChildNames(_keyBuilder.sessions(_instanceName));
+  public static synchronized void carryOverPreviousCurrentState(HelixDataAccessor dataAccessor,
+      String instanceName, String sessionId, StateMachineEngine stateMachineEngine,
+      boolean setToInitState) {
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+    List<String> sessions = dataAccessor.getChildNames(keyBuilder.sessions(instanceName));
 
     for (String session : sessions) {
-      if (session.equals(_sessionId)) {
+      if (session.equals(sessionId)) {
         continue;
       }
 
       // Ignore if any current states in the previous folder cannot be read.
       List<CurrentState> lastCurStates =
-          _dataAccessor.getChildValues(_keyBuilder.currentStates(_instanceName, session), false);
+          dataAccessor.getChildValues(keyBuilder.currentStates(instanceName, session), false);
 
       for (CurrentState lastCurState : lastCurStates) {
         LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
-            + " to current session: " + _sessionId);
+            + " to current session: " + sessionId + ", setToInitState: " + setToInitState);
         String stateModelDefRef = lastCurState.getStateModelDefRef();
         if (stateModelDefRef == null) {
           LOG.error(
@@ -368,21 +390,37 @@ public class ParticipantManager {
           continue;
         }
 
-        StateModelDefinition stateModel =
-            _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
+        StateModelDefinition stateModelDef =
+            dataAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
+        String initState = stateModelDef.getInitialState();
+        Map<String, String> partitionExpectedStateMap = new HashMap<>();
+        if (setToInitState) {
+          lastCurState.getPartitionStateMap().keySet()
+              .forEach(partition -> partitionExpectedStateMap.put(partition, initState));
+        } else {
+          String factoryName = lastCurState.getStateModelFactoryName();
+          StateModelFactory<? extends StateModel> stateModelFactory =
+              stateMachineEngine.getStateModelFactory(stateModelDefRef, factoryName);
+          lastCurState.getPartitionStateMap().keySet().forEach(partition -> {
+            StateModel stateModel =
+                stateModelFactory.getStateModel(lastCurState.getResourceName(), partition);
+            if (stateModel != null) {
+              partitionExpectedStateMap.put(partition, stateModel.getCurrentState());
+            }
+          });
+        }
 
-        BaseDataAccessor<ZNRecord> baseAccessor = _dataAccessor.getBaseDataAccessor();
+        BaseDataAccessor<ZNRecord> baseAccessor = dataAccessor.getBaseDataAccessor();
         String curStatePath =
-            _keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
+            keyBuilder.currentState(instanceName, sessionId, lastCurState.getResourceName())
                 .getPath();
 
-        String initState = stateModel.getInitialState();
         if (lastCurState.getBucketSize() > 0) {
           // update parent node
           ZNRecord metaRecord = new ZNRecord(lastCurState.getId());
           metaRecord.setSimpleFields(lastCurState.getRecord().getSimpleFields());
           DataUpdater<ZNRecord> metaRecordUpdater =
-              new CurStateCarryOverUpdater(_sessionId, initState, new CurrentState(metaRecord));
+              new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, new CurrentState(metaRecord));
           boolean success =
               baseAccessor.update(curStatePath, metaRecordUpdater, AccessOption.PERSISTENT);
           if (success) {
@@ -394,7 +432,7 @@ public class ParticipantManager {
             List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
             for (String bucketName : map.keySet()) {
               paths.add(curStatePath + "/" + bucketName);
-              updaters.add(new CurStateCarryOverUpdater(_sessionId, initState, new CurrentState(map
+              updaters.add(new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, new CurrentState(map
                   .get(bucketName))));
             }
 
@@ -402,8 +440,8 @@ public class ParticipantManager {
           }
 
         } else {
-          _dataAccessor.getBaseDataAccessor().update(curStatePath,
-              new CurStateCarryOverUpdater(_sessionId, initState, lastCurState),
+          dataAccessor.getBaseDataAccessor().update(curStatePath,
+              new CurStateCarryOverUpdater(sessionId, partitionExpectedStateMap, lastCurState),
               AccessOption.PERSISTENT);
         }
       }
@@ -413,13 +451,16 @@ public class ParticipantManager {
      * remove previous current state parent nodes
      */
     for (String session : sessions) {
-      if (session.equals(_sessionId)) {
+      if (session.equals(sessionId)) {
         continue;
       }
 
-      String path = _keyBuilder.currentStates(_instanceName, session).getPath();
-      LOG.info("Removing current states from previous sessions. path: " + path);
-      _zkclient.deleteRecursively(path);
+      PropertyKey currentStatesProperty = keyBuilder.currentStates(instanceName, session);
+      String path = currentStatesProperty.getPath();
+      LOG.info("Removing current states from previous sessions. path: {}", path);
+      if (!dataAccessor.removeProperty(currentStatesProperty)) {
+        throw new ZkClientException("Failed to delete " + path);
+      }
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index ef8e733..4b88f74 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -1317,6 +1317,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     LOG.info("Handle new session, instance: {}, type: {}, session id: {}.", _instanceName,
         _instanceType,  sessionId);
 
+    // Will only create live instance
+    LiveInstance.LiveInstanceStatus liveInstanceStatus =
+        _messagingService.getExecutor().getLiveInstanceStatus();
+    if (LiveInstance.LiveInstanceStatus.PAUSED
+        .equals(liveInstanceStatus)) {
+      handleNewSessionInManagementMode(sessionId, liveInstanceStatus);
+      return;
+    }
+
     /**
      * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
      * disconnect if fail to cleanup
@@ -1343,13 +1352,13 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     switch (_instanceType) {
     case PARTICIPANT:
-      handleNewSessionAsParticipant(sessionId);
+      handleNewSessionAsParticipant(sessionId, _liveInstanceInfoProvider);
       break;
     case CONTROLLER:
       handleNewSessionAsController();
       break;
     case CONTROLLER_PARTICIPANT:
-      handleNewSessionAsParticipant(sessionId);
+      handleNewSessionAsParticipant(sessionId, _liveInstanceInfoProvider);
       handleNewSessionAsController();
       break;
     case ADMINISTRATOR:
@@ -1376,12 +1385,25 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     }
   }
 
-  void handleNewSessionAsParticipant(final String sessionId) throws Exception {
+  private void handleNewSessionInManagementMode(String sessionId,
+      LiveInstance.LiveInstanceStatus liveInstanceStatus) throws Exception {
+    LOG.info("Skip reset because instance is in {} status", LiveInstance.LiveInstanceStatus.PAUSED);
+    if (!InstanceType.PARTICIPANT.equals(_instanceType)
+        && !InstanceType.CONTROLLER_PARTICIPANT.equals(_instanceType)) {
+      return;
+    }
+    // Add STATUS to info provider so the new live instance will have STATUS field
+    handleNewSessionAsParticipant(sessionId,
+        new LiveInstanceStatusInfoProvider(liveInstanceStatus));
+  }
+
+  void handleNewSessionAsParticipant(final String sessionId, LiveInstanceInfoProvider provider)
+      throws Exception {
     if (_participantManager != null) {
       _participantManager.reset();
     }
     _participantManager =
-        new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
+        new ParticipantManager(this, _zkclient, _sessionTimeout, provider,
             _preConnectCallbacks, sessionId, _helixManagerProperty);
     _participantManager.handleNewSession();
   }
@@ -1535,4 +1557,21 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     }
     return zkConnectionInfo;
   }
+
+  /*
+   * Provides live instance status as additional live instance info in the info provider.
+   */
+  private static class LiveInstanceStatusInfoProvider implements LiveInstanceInfoProvider {
+    private final ZNRecord _record;
+
+    public LiveInstanceStatusInfoProvider(LiveInstance.LiveInstanceStatus status) {
+      _record = new ZNRecord("STATUS_PROVIDER");
+      _record.setEnumField(LiveInstance.LiveInstanceProperty.STATUS.name(), status);
+    }
+
+    @Override
+    public ZNRecord getAdditionalLiveInstanceInfo() {
+      return _record;
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 0e8aace..ff970c2 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -56,10 +56,12 @@ import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.zk.ParticipantManager;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
@@ -127,6 +129,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   private MessageQueueMonitor _messageQueueMonitor;
   private GenericHelixController _controller;
   private Long _lastSessionSyncTime;
+  private String _freezeSessionId;
+  private LiveInstanceStatus _liveInstanceStatus;
   private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
   private static final String SESSION_SYNC = "SESSION-SYNC";
   /**
@@ -630,6 +634,24 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
             + ", pool: " + pool);
   }
 
+  private void syncFactoryState() {
+    LOG.info("Start to sync factory state");
+    // Lock on the registry to avoid race condition when concurrently calling sync() and reset()
+    synchronized (_hdlrFtyRegistry) {
+      for (Map.Entry<String, MsgHandlerFactoryRegistryItem> entry : _hdlrFtyRegistry.entrySet()) {
+        MsgHandlerFactoryRegistryItem item = entry.getValue();
+        if (item.factory() != null) {
+          try {
+            item.factory().sync();
+          } catch (Exception ex) {
+            LOG.error("Failed to syncState the factory {} of message type {}.", item.factory(),
+                entry.getKey(), ex);
+          }
+        }
+      }
+    }
+  }
+
   void reset() {
     LOG.info("Reset HelixTaskExecutor");
 
@@ -637,22 +659,24 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       _messageQueueMonitor.reset();
     }
 
-    for (String msgType : _hdlrFtyRegistry.keySet()) {
-      // don't un-register factories, just shutdown all executors
-      ExecutorService pool = _executorMap.remove(msgType);
-      _monitor.removeExecutorMonitor(msgType);
-      if (pool != null) {
-        LOG.info("Reset exectuor for msgType: " + msgType + ", pool: " + pool);
-        shutdownAndAwaitTermination(pool);
-      }
+    synchronized (_hdlrFtyRegistry) {
+      for (String msgType : _hdlrFtyRegistry.keySet()) {
+        // don't un-register factories, just shutdown all executors
+        ExecutorService pool = _executorMap.remove(msgType);
+        _monitor.removeExecutorMonitor(msgType);
+        if (pool != null) {
+          LOG.info("Reset exectuor for msgType: " + msgType + ", pool: " + pool);
+          shutdownAndAwaitTermination(pool);
+        }
 
-      MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
-      if (item.factory() != null) {
-        try {
-          item.factory().reset();
-        } catch (Exception ex) {
-          LOG.error("Failed to reset the factory {} of message type {}.", item.factory().toString(),
-              msgType, ex);
+        MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
+        if (item.factory() != null) {
+          try {
+            item.factory().reset();
+          } catch (Exception ex) {
+            LOG.error("Failed to reset the factory {} of message type {}.", item.factory().toString(),
+                msgType, ex);
+          }
         }
       }
     }
@@ -1068,6 +1092,13 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         }
       }
 
+      if (MessageType.PARTICIPANT_STATUS_CHANGE.name().equals(message.getMsgType())) {
+        LiveInstanceStatus toStatus = LiveInstanceStatus.valueOf(message.getToState());
+        changeParticipantStatus(instanceName, toStatus, manager);
+        reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.COMPLETED);
+        return true;
+      }
+
       _monitor.reportReceivedMessage(message);
     } catch (Exception e) {
       LOG.error("Failed to process the message {}. Deleting the message from ZK. Exception: {}",
@@ -1305,6 +1336,56 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     return String.format("%s_%s", resourceName, partitionName);
   }
 
+  private void changeParticipantStatus(String instanceName,
+      LiveInstance.LiveInstanceStatus toStatus, HelixManager manager) {
+    if (toStatus == null) {
+      LOG.warn("To status is null! Skip participant status change.");
+      return;
+    }
+
+    LOG.info("Changing participant {} status to {} from {}", instanceName, toStatus,
+        _liveInstanceStatus);
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    String sessionId = manager.getSessionId();
+    String path = accessor.keyBuilder().liveInstance(instanceName).getPath();
+    boolean success = false;
+
+    switch (toStatus) {
+      case PAUSED:
+        _freezeSessionId = sessionId;
+        _liveInstanceStatus = toStatus;
+        // Entering freeze mode, update live instance status.
+        // If the update fails, another new freeze message will be sent by controller.
+        success = accessor.getBaseDataAccessor().update(path, record -> {
+          record.setEnumField(LiveInstance.LiveInstanceProperty.STATUS.name(), toStatus);
+          return record;
+        }, AccessOption.EPHEMERAL);
+        break;
+      case NORMAL:
+        // Exiting freeze mode
+        // session changed, should call state model sync
+        if (_freezeSessionId != null && !_freezeSessionId.equals(sessionId)) {
+          syncFactoryState();
+          ParticipantManager.carryOverPreviousCurrentState(accessor, instanceName, sessionId,
+              manager.getStateMachineEngine(), false);
+        }
+        _freezeSessionId = null;
+        _liveInstanceStatus = null;
+        success = accessor.getBaseDataAccessor().update(path, record -> {
+          // Remove the status field for backwards compatibility
+          record.getSimpleFields().remove(LiveInstance.LiveInstanceProperty.STATUS.name());
+          return record;
+        }, AccessOption.EPHEMERAL);
+        break;
+      default:
+        LOG.warn("To status {} is not supported", toStatus);
+        break;
+    }
+
+    LOG.info("Changed participant {} status to {}. FreezeSessionId={}, update success={}",
+        instanceName, _liveInstanceStatus, _freezeSessionId, success);
+  }
+
   private String getStateTransitionType(String prefix, String fromState, String toState) {
     if (prefix == null || fromState == null || toState == null) {
       return null;
@@ -1316,6 +1397,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     return MessageType.STATE_TRANSITION.name() + "." + resourceName;
   }
 
+  public LiveInstanceStatus getLiveInstanceStatus() {
+    return _liveInstanceStatus;
+  }
+
   private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
       String instanceName) {
     if (HelixUtil.removeMessageFromZK(accessor, message, instanceName)) {
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
index acbeadc..b402c29 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
@@ -21,6 +21,8 @@ package org.apache.helix.messaging.handling;
 
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Deprecated
 public interface MessageHandlerFactory {
@@ -37,4 +39,12 @@ public interface MessageHandlerFactory {
   String getMessageType();
 
   void reset();
+
+  default void sync() {
+    LogHolder.LOG.warn("Invoked default sync() without any operation");
+  }
+}
+
+final class LogHolder {
+  static final Logger LOG = LoggerFactory.getLogger(MessageHandlerFactory.class);
 }
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 4757c44..e421638 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.helix.HelixConstants;
@@ -147,17 +148,31 @@ public class HelixStateMachineEngine implements StateMachineEngine {
   @Override
   public void reset() {
     logger.info("Resetting HelixStateMachineEngine");
+    loopStateModelFactories(stateModel -> {
+      stateModel.reset();
+      String initialState = _stateModelParser.getInitialState(stateModel.getClass());
+      stateModel.updateState(initialState);
+    });
+    logger.info("Successfully reset HelixStateMachineEngine");
+  }
+
+  @Override
+  public void sync() {
+    logger.info("Syncing HelixStateMachineEngine");
+    loopStateModelFactories(StateModel::syncState);
+    logger.info("Successfully synced HelixStateMachineEngine");
+  }
+
+  private void loopStateModelFactories(Consumer<StateModel> consumer) {
     for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
         .values()) {
       for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
         for (String resourceName : stateModelFactory.getResourceSet()) {
           for (String partitionKey : stateModelFactory.getPartitionSet(resourceName)) {
-            logger.info("Resetting {}::{}", resourceName, partitionKey);
+            logger.info("Operating on {}::{}", resourceName, partitionKey);
             StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
             if (stateModel != null) {
-              stateModel.reset();
-              String initialState = _stateModelParser.getInitialState(stateModel.getClass());
-              stateModel.updateState(initialState);
+              consumer.accept(stateModel);
               // TODO probably should update the state on ZK. Shi confirm what needs
               // to be done here.
             } else {
@@ -170,15 +185,13 @@ public class HelixStateMachineEngine implements StateMachineEngine {
               // We may need to add more processing here to make sure things are being set to
               // initialState. Otherwise, there might be inconsistencies that might cause partitions
               // to be stuck in some state (because reset() would be a NOP here)
-              logger.warn(
-                  "Failed to reset due to StateModel being null! Resource: {}, Partition: {}",
+              logger.warn("Failed operation due to StateModel being null! Resource: {}, Partition: {}",
                   resourceName, partitionKey);
             }
           }
         }
       }
     }
-    logger.info("Successfully reset HelixStateMachineEngine");
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index 2c88a68..143c14a 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -63,6 +63,13 @@ public abstract class StateModel {
   }
 
   /**
+   * Called when cluster is recovering from freeze mode if session changed during freeze mode.
+   */
+  public void syncState() {
+    logger.warn("Default syncState method invoked without any operation.");
+  }
+
+  /**
    * Called when the state model is reset
    */
   public void reset() {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantFreeze.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantFreeze.java
new file mode 100644
index 0000000..fedee37
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantFreeze.java
@@ -0,0 +1,291 @@
+package org.apache.helix.integration.paticipant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.util.MessageUtil;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestParticipantFreeze extends ZkTestBase {
+  private HelixManager _manager;
+  private HelixDataAccessor _accessor;
+  private PropertyKey.Builder _keyBuilder;
+  private String _clusterName;
+  private int _numNodes;
+  private String _resourceName;
+  private String _instanceName;
+  private MockParticipantManager[] _participants;
+  // current states in participant[0]
+  private List<CurrentState> _originCurStates;
+  private String _originSession;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _clusterName = "CLUSTER_" + TestHelper.getTestClassName();
+    _numNodes = 3;
+    _resourceName = "TestDB";
+    _participants = new MockParticipantManager[_numNodes];
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        _resourceName, // resource name prefix
+        1, // resources
+        1, // partitions per resource
+        _numNodes, // number of nodes
+        3, // replicas
+        "MasterSlave", true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+    _accessor = _manager.getHelixDataAccessor();
+    _keyBuilder = _accessor.keyBuilder();
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < _numNodes; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+      _participants[i].syncStart();
+    }
+    _instanceName = _participants[0].getInstanceName();
+
+    Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName)));
+
+    // We just need controller to rebalance the cluster once to get current states.
+    controller.syncStop();
+
+    _originSession = _participants[0].getSessionId();
+    _originCurStates =
+        _accessor.getChildValues(_keyBuilder.currentStates(_instanceName, _originSession), false);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _manager.disconnect();
+    Arrays.stream(_participants).forEach(ClusterManager::syncStop);
+    deleteCluster(_clusterName);
+  }
+
+  /*
+   * Live instance is not frozen and does not have a frozen status field
+   */
+  @Test
+  public void testNormalLiveInstanceStatus() {
+    LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(_instanceName));
+    Assert.assertNull(liveInstance.getStatus());
+  }
+
+  @Test(dependsOnMethods = "testNormalLiveInstanceStatus")
+  public void testFreezeParticipant() throws Exception {
+    freezeParticipant(_participants[0]);
+  }
+
+  // Simulates instance is restarted and the in-memory status is gone.
+  // When instance comes back alive, it'll reset state model, carry over
+  // and set current state to init state.
+  @Test(dependsOnMethods = "testFreezeParticipant")
+  public void testRestartParticipantWhenFrozen() throws Exception {
+    String instanceName = _participants[1].getInstanceName();
+    List<CurrentState> originCurStates = _accessor
+        .getChildValues(_keyBuilder.currentStates(instanceName, _participants[1].getSessionId()),
+            false);
+    String oldSession = _participants[1].getSessionId();
+    freezeParticipant(_participants[1]);
+
+    // Restart participants[1]
+    _participants[1].syncStop();
+    _participants[1] = new MockParticipantManager(ZK_ADDR, _participants[1].getClusterName(),
+        instanceName);
+    _participants[1].syncStart();
+
+    Assert.assertTrue(TestHelper.verify(() ->
+            _gZkClient.exists(_keyBuilder.liveInstance(instanceName).getPath()),
+        TestHelper.WAIT_DURATION));
+    LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(instanceName));
+
+    // New live instance ephemeral node
+    Assert.assertEquals(liveInstance.getEphemeralOwner(), _participants[1].getSessionId());
+    // Status is not frozen because controller is not running, no freeze message sent.
+    verifyLiveInstanceStatus(_participants[1], null);
+
+    // Old session current state is deleted because of current state carry-over
+    Assert.assertTrue(TestHelper.verify(
+        () -> !_gZkClient.exists(_keyBuilder.currentStates(instanceName, oldSession).getPath()),
+        TestHelper.WAIT_DURATION));
+
+    // Current states are set to init states (OFFLINE)
+    List<CurrentState> curStates = _accessor
+        .getChildValues(_keyBuilder.currentStates(instanceName, _participants[1].getSessionId()),
+            false);
+    Assert.assertEquals(curStates.size(), 1);
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (CurrentState cs : originCurStates) {
+        String stateModelDefRef = cs.getStateModelDefRef();
+        for (String partition : cs.getPartitionStateMap().keySet()) {
+          StateModelDefinition stateModelDef =
+              _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
+          String initState = stateModelDef.getInitialState();
+          if (!initState.equals(curStates.get(0).getPartitionStateMap().get(partition))) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  // Simulates session expires but in-memory status is still kept.
+  // No state model reset or current state carry-over
+  @Test(dependsOnMethods = "testRestartParticipantWhenFrozen")
+  public void testHandleNewSessionWhenFrozen() throws Exception {
+    // there are current states for the resource
+    Assert.assertFalse(_originCurStates.isEmpty());
+
+    ZkTestHelper.expireSession(_participants[0].getZkClient());
+    String currentSession = _participants[0].getSessionId();
+    Assert.assertFalse(_originSession.equals(currentSession));
+
+    Assert.assertTrue(TestHelper.verify(() ->
+            _gZkClient.exists(_keyBuilder.liveInstance(_instanceName).getPath()),
+            TestHelper.WAIT_DURATION));
+    LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(_instanceName));
+
+    // New live instance ephemeral node with FROZEN status
+    Assert.assertFalse(_originSession.equals(liveInstance.getEphemeralOwner()));
+    Assert.assertEquals(liveInstance.getStatus(), LiveInstance.LiveInstanceStatus.PAUSED);
+
+    // New session path does not exist since no current state carry over for the current session.
+    Assert.assertFalse(
+        _gZkClient.exists(_keyBuilder.currentStates(_instanceName, currentSession).getPath()));
+    // Old session CS still exist.
+    Assert.assertTrue(
+        _gZkClient.exists(_keyBuilder.currentStates(_instanceName, _originSession).getPath()));
+  }
+
+  @Test(dependsOnMethods = "testHandleNewSessionWhenFrozen")
+  public void testUnfreezeParticipant() throws Exception {
+    Message unfreezeMessage = MessageUtil
+        .createStatusChangeMessage(LiveInstance.LiveInstanceStatus.PAUSED,
+            LiveInstance.LiveInstanceStatus.NORMAL, _manager.getInstanceName(),
+            _manager.getSessionId(), _instanceName, _participants[0].getSessionId());
+    List<PropertyKey> keys = Collections
+        .singletonList(_keyBuilder.message(unfreezeMessage.getTgtName(), unfreezeMessage.getId()));
+
+    boolean[] success = _accessor.createChildren(keys, Collections.singletonList(unfreezeMessage));
+    Assert.assertTrue(success[0]);
+
+    // Live instance status is NORMAL, but set to null value in both memory and zk.
+    // After live instance status is updated, the process is completed.
+    verifyLiveInstanceStatus(_participants[0], null);
+    // Unfreeze message is correctly deleted
+    Assert.assertNull(
+        _accessor.getProperty(_keyBuilder.message(_instanceName, unfreezeMessage.getId())));
+
+    // current state is carried over
+    List<CurrentState> curStates = _accessor
+        .getChildValues(_keyBuilder.currentStates(_instanceName, _participants[0].getSessionId()),
+            false);
+    Assert.assertFalse(curStates.isEmpty());
+    // The original current states are deleted.
+    Assert.assertFalse(
+        _gZkClient.exists(_keyBuilder.currentStates(_instanceName, _originSession).getPath()));
+
+    // current states should be the same as the original current states
+    // with CS carry-over when unfreezing
+    Assert.assertTrue(verifyCurrentStates(_originCurStates, curStates));
+  }
+
+  private void verifyLiveInstanceStatus(MockParticipantManager participant,
+      LiveInstance.LiveInstanceStatus status) throws Exception {
+    // Live instance status is frozen in both memory and zk
+    Assert.assertTrue(TestHelper.verify(() -> {
+      LiveInstance.LiveInstanceStatus inMemoryLiveInstanceStatus =
+          ((DefaultMessagingService) participant.getMessagingService()).getExecutor()
+              .getLiveInstanceStatus();
+      return inMemoryLiveInstanceStatus == status;
+    }, TestHelper.WAIT_DURATION));
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      LiveInstance liveInstance =
+          _accessor.getProperty(_keyBuilder.liveInstance(participant.getInstanceName()));
+      return liveInstance.getStatus() == status;
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  private boolean verifyCurrentStates(List<CurrentState> originCurStates,
+      List<CurrentState> curStates) {
+    for (CurrentState ocs : originCurStates) {
+      for (CurrentState cs : curStates) {
+        if (cs.getId().equals(ocs.getId())
+            && !cs.getPartitionStateMap().equals(ocs.getPartitionStateMap())) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private void freezeParticipant(MockParticipantManager participant) throws Exception {
+    Message freezeMessage = MessageUtil
+        .createStatusChangeMessage(LiveInstance.LiveInstanceStatus.NORMAL,
+            LiveInstance.LiveInstanceStatus.PAUSED, _manager.getInstanceName(),
+            _manager.getSessionId(), participant.getInstanceName(), participant.getSessionId());
+
+    List<PropertyKey> keys = Collections
+        .singletonList(_keyBuilder.message(freezeMessage.getTgtName(), freezeMessage.getId()));
+
+    boolean[] success = _accessor.createChildren(keys, Collections.singletonList(freezeMessage));
+    Assert.assertTrue(success[0]);
+
+    // Live instance status is frozen in both memory and zk
+    verifyLiveInstanceStatus(participant, LiveInstance.LiveInstanceStatus.PAUSED);
+    // Freeze message is correctly deleted
+    Assert.assertNull(_accessor
+        .getProperty(_keyBuilder.message(participant.getInstanceName(), freezeMessage.getId())));
+  }
+}