You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2021/06/22 23:27:14 UTC

[helix] 05/06: Add message util to create messages (#1796)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 25117d9d0b98b048d7955655c47fa89837bbe361
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Mon Jun 14 21:32:25 2021 -0700

    Add message util to create messages (#1796)
    
    Message creation methods are private in message generation phase. Management mode stage will also need message generation methods to create ST cancellation and participant status change messages.
    Message util will help with the purposes.
    
    This commit moves the common message creation logic to a message util so multiple stages can reuse the code.
---
 .../controller/stages/MessageGenerationPhase.java  |  97 +++-----------
 .../java/org/apache/helix/util/MessageUtil.java    | 139 +++++++++++++++++++++
 .../TestStateTransitionAppFailureHandling.java     |   5 +-
 3 files changed, 157 insertions(+), 84 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 836e5df..38d9908 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
@@ -45,13 +44,12 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.MessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,8 +68,6 @@ public class MessageGenerationPhase extends AbstractBaseStage {
       .getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
   private final static String PENDING_MESSAGE = "pending message";
   private final static String STALE_MESSAGE = "stale message";
-  // TODO: Make the message retry count configurable through the Cluster Config or IdealStates.
-  public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
 
   private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);
 
@@ -215,10 +211,12 @@ public class MessageGenerationPhase extends AbstractBaseStage {
         if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) {
           if (shouldCreateSTCancellation(pendingMessage, desiredState,
               stateModelDef.getInitialState())) {
-            message = createStateTransitionCancellationMessage(manager, resource,
-                partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
-                stateModelDef.getId(), pendingMessage.getFromState(), pendingMessage.getToState(),
-                null, cancellationMessage, isCancellationEnabled, currentState);
+            message = MessageUtil
+                .createStateTransitionCancellationMessage(manager.getInstanceName(),
+                    manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
+                    sessionIdMap.get(instanceName), stateModelDef.getId(),
+                    pendingMessage.getFromState(), pendingMessage.getToState(), null,
+                    cancellationMessage, isCancellationEnabled, currentState);
           }
         } else {
           if (nextState == null) {
@@ -236,9 +234,10 @@ public class MessageGenerationPhase extends AbstractBaseStage {
                     cancellationMessage, isCancellationEnabled);
           } else {
             // Create new state transition message
-            message = createStateTransitionMessage(manager, resource, partition.getPartitionName(),
-                instanceName, currentState, nextState, sessionIdMap.get(instanceName),
-                stateModelDef.getId());
+            message = MessageUtil
+                .createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
+                    resource, partition.getPartitionName(), instanceName, currentState, nextState,
+                    sessionIdMap.get(instanceName), stateModelDef.getId());
 
             if (logger.isDebugEnabled()) {
               LogUtil.logDebug(logger, _eventId, String.format(
@@ -331,10 +330,10 @@ public class MessageGenerationPhase extends AbstractBaseStage {
                 + instanceName + ", pendingState: " + pendingState + ", currentState: "
                 + currentState + ", nextState: " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
 
-        message = createStateTransitionCancellationMessage(manager, resource,
-            partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName),
-            stateModelDef.getId(), pendingMessage.getFromState(), pendingState, nextState,
-            cancellationMessage, isCancellationEnabled, currentState);
+        message = MessageUtil.createStateTransitionCancellationMessage(manager.getInstanceName(),
+            manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
+            sessionIdMap.get(instanceName), stateModelDef.getId(), pendingMessage.getFromState(),
+            pendingState, nextState, cancellationMessage, isCancellationEnabled, currentState);
       }
     }
     return message;
@@ -417,72 +416,6 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     }
   }
 
-  private Message createStateTransitionMessage(HelixManager manager, Resource resource,
-      String partitionName, String instanceName, String currentState, String nextState,
-      String sessionId, String stateModelDefName) {
-    String uuid = UUID.randomUUID().toString();
-    String managerSessionId = manager.getSessionId();
-    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
-    message.setSrcName(manager.getInstanceName());
-    message.setTgtName(instanceName);
-    message.setMsgState(MessageState.NEW);
-    message.setPartitionName(partitionName);
-    message.setResourceName(resource.getResourceName());
-    message.setFromState(currentState);
-    message.setToState(nextState);
-    message.setTgtSessionId(sessionId);
-    message.setSrcSessionId(managerSessionId);
-    message.setExpectedSessionId(managerSessionId);
-    message.setStateModelDef(stateModelDefName);
-    message.setStateModelFactoryName(resource.getStateModelFactoryname());
-    message.setBucketSize(resource.getBucketSize());
-    // Set the retry count for state transition messages.
-    // TODO: make the retry count configurable in ClusterConfig or IdealState
-    message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
-
-    if (resource.getResourceGroupName() != null) {
-      message.setResourceGroupName(resource.getResourceGroupName());
-    }
-    if (resource.getResourceTag() != null) {
-      message.setResourceTag(resource.getResourceTag());
-    }
-
-    return message;
-  }
-
-  private Message createStateTransitionCancellationMessage(HelixManager manager, Resource resource,
-      String partitionName, String instanceName, String sessionId, String stateModelDefName,
-      String fromState, String toState, String nextState, Message cancellationMessage,
-      boolean isCancellationEnabled, String currentState) {
-
-    if (isCancellationEnabled && cancellationMessage == null) {
-      logger.info("Event {} : Send cancellation message of the state transition for {}.{} on {}, "
-              + "currentState: {}, nextState: {},  toState: {}",
-          _eventId, resource.getResourceName(), partitionName, instanceName,
-          currentState, nextState == null ? "N/A" : nextState, toState);
-
-      String uuid = UUID.randomUUID().toString();
-      String managerSessionId = manager.getSessionId();
-      Message message = new Message(MessageType.STATE_TRANSITION_CANCELLATION, uuid);
-      message.setSrcName(manager.getInstanceName());
-      message.setTgtName(instanceName);
-      message.setMsgState(MessageState.NEW);
-      message.setPartitionName(partitionName);
-      message.setResourceName(resource.getResourceName());
-      message.setFromState(fromState);
-      message.setToState(toState);
-      message.setTgtSessionId(sessionId);
-      message.setSrcSessionId(managerSessionId);
-      message.setExpectedSessionId(managerSessionId);
-      message.setStateModelDef(stateModelDefName);
-      message.setStateModelFactoryName(resource.getStateModelFactoryname());
-      message.setBucketSize(resource.getBucketSize());
-      return message;
-    }
-
-    return null;
-  }
-
   private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfig,
       String currentState, String nextState, IdealState idealState, Partition partition) {
     StateTransitionTimeoutConfig stateTransitionTimeoutConfig =
diff --git a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
new file mode 100644
index 0000000..94de833
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
@@ -0,0 +1,139 @@
+package org.apache.helix.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.UUID;
+
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Message utils to operate on message such creating messages.
+ */
+public class MessageUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(MessageUtil.class);
+
+  // TODO: Make the message retry count configurable through the Cluster Config or IdealStates.
+  public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
+
+  public static Message createStateTransitionCancellationMessage(String srcInstanceName,
+      String srcSessionId, Resource resource, String partitionName, String instanceName,
+      String sessionId, String stateModelDefName, String fromState, String toState,
+      String nextState, Message cancellationMessage, boolean isCancellationEnabled,
+      String currentState) {
+    if (isCancellationEnabled && cancellationMessage == null) {
+      LOG.info("Create cancellation message of the state transition for {}.{} on {}, "
+              + "currentState: {}, nextState: {},  toState: {}", resource.getResourceName(),
+          partitionName, instanceName, currentState, nextState == null ? "N/A" : nextState,
+          toState);
+
+      Message message =
+          createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
+              srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
+              nextState, sessionId, stateModelDefName);
+
+      message.setFromState(fromState);
+      message.setToState(toState);
+      return message;
+    }
+
+    return null;
+  }
+
+  public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
+      Resource resource, String partitionName, String instanceName, String currentState,
+      String nextState, String tgtSessionId, String stateModelDefName) {
+    Message message =
+        createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName,
+            srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId,
+            stateModelDefName);
+
+    // Set the retry count for state transition messages.
+    // TODO: make the retry count configurable in ClusterConfig or IdealState
+    message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+
+    if (resource.getResourceGroupName() != null) {
+      message.setResourceGroupName(resource.getResourceGroupName());
+    }
+    if (resource.getResourceTag() != null) {
+      message.setResourceTag(resource.getResourceTag());
+    }
+
+    return message;
+  }
+
+  /**
+   * Creates a message to change participant status
+   * {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
+   *
+   * @param currentState current status of the live instance
+   * @param nextState next status that will be changed to
+   * @param srcInstanceName source instance name
+   * @param srcSessionId session id for the source instance
+   * @param tgtInstanceName target instance name
+   * @param tgtSessionId target instance session id
+   * @return participant status change message
+   */
+  public static Message createStatusChangeMessage(LiveInstance.LiveInstanceStatus currentState,
+      LiveInstance.LiveInstanceStatus nextState, String srcInstanceName, String srcSessionId,
+      String tgtInstanceName, String tgtSessionId) {
+    return createBasicMessage(Message.MessageType.PARTICIPANT_STATUS_CHANGE, srcInstanceName,
+        srcSessionId, tgtInstanceName, tgtSessionId, currentState.name(), nextState.name());
+  }
+
+  /* Creates a message that that has the least required fields. */
+  private static Message createBasicMessage(Message.MessageType messageType, String srcInstanceName,
+      String srcSessionId, String tgtInstanceName, String tgtSessionId, String currentState,
+      String nextState) {
+    String uuid = UUID.randomUUID().toString();
+
+    Message message = new Message(messageType, uuid);
+    message.setSrcName(srcInstanceName);
+    message.setTgtName(tgtInstanceName);
+    message.setMsgState(Message.MessageState.NEW);
+    message.setFromState(currentState);
+    message.setToState(nextState);
+    message.setTgtSessionId(tgtSessionId);
+    message.setSrcSessionId(srcSessionId);
+    message.setExpectedSessionId(srcSessionId);
+
+    return message;
+  }
+
+  /* Creates state transition or state transition cancellation message */
+  private static Message createStateTransitionMessage(Message.MessageType messageType,
+      String srcInstanceName, String srcSessionId, Resource resource, String partitionName,
+      String instanceName, String currentState, String nextState, String tgtSessionId,
+      String stateModelDefName) {
+    Message message =
+        createBasicMessage(messageType, srcInstanceName, srcSessionId, instanceName, tgtSessionId,
+            currentState, nextState);
+    message.setPartitionName(partitionName);
+    message.setStateModelDef(stateModelDefName);
+    message.setResourceName(resource.getResourceName());
+    message.setStateModelFactoryName(resource.getStateModelFactoryname());
+    message.setBucketSize(resource.getBucketSize());
+
+    return message;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
index 12ede56..e97424c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
@@ -37,6 +37,7 @@ import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.MessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -122,7 +123,7 @@ public class TestStateTransitionAppFailureHandling extends ZkStandAloneCMTestBas
       // Check if the factory has tried enough times before fail the message.
       Assert.assertEquals(retryCountUntilSucceed - retryFactoryMap.get(instanceName)
           .getRemainingRetryCountUntilSucceed(), instanceMessages.size()
-          * MessageGenerationPhase.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+          * MessageUtil.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
     }
 
     // Verify that the partition is not initialized.
@@ -146,7 +147,7 @@ public class TestStateTransitionAppFailureHandling extends ZkStandAloneCMTestBas
     // Make the mock StateModelFactory return handler before last retry. So it will successfully
     // finish handler initialization.
     int retryCountUntilSucceed =
-        MessageGenerationPhase.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT - 1;
+        MessageUtil.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT - 1;
     Map<String, RetryStateModelFactory> retryFactoryMap = resetParticipants(retryCountUntilSucceed);
 
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);