You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/01/31 02:08:46 UTC

[helix] branch master updated: Fix for - Stale message redundant logs

This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new a478bbc  Fix for - Stale message redundant logs
a478bbc is described below

commit a478bbcfc8161f3f7ed62b1e1a28cc4b0e68e863
Author: desaikomal <98...@users.noreply.github.com>
AuthorDate: Sun Jan 30 18:08:42 2022 -0800

    Fix for - Stale message redundant logs
    
    Avoid printing redundant log messages for unrelated partitions and resources.
---
 .../controller/stages/MessageGenerationPhase.java  | 69 ++++++++++++----------
 1 file changed, 38 insertions(+), 31 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 38d9908..98313b9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -77,7 +77,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     _eventId = event.getEventId();
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-    BaseControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
+    BaseControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
     Map<String, Resource> resourceMap =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
     CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
@@ -172,15 +173,14 @@ public class MessageGenerationPhase extends AbstractBaseStage {
           nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
 
           if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
-            LogUtil.logDebug(logger, _eventId,
-                String.format(
-                    "No current state for partition %s in resource %s, skip the drop message",
+            LogUtil.logDebug(logger, _eventId, String
+                .format("No current state for partition %s in resource %s, skip the drop message",
                     partition.getPartitionName(), resourceName));
 
             message =
-                generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, pendingMessage,
-                    manager, resource, partition, sessionIdMap, instanceName, stateModelDef,
-                    cancellationMessage, isCancellationEnabled);
+                generateCancellationMessageForPendingMessage(desiredState, currentState, nextState,
+                    pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
+                    stateModelDef, cancellationMessage, isCancellationEnabled);
             addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState,
                 resourceName, partition, currentState, nextState);
 
@@ -200,9 +200,13 @@ public class MessageGenerationPhase extends AbstractBaseStage {
         }
 
         for (Message staleMessage : staleMessages) {
-          if (System.currentTimeMillis() - currentStateOutput
-              .getEndTime(resourceName, partition, instanceName)
-              > DEFAULT_OBSELETE_MSG_PURGE_DELAY) {
+          // staleMessage can be simple or batch mode
+          if ((System.currentTimeMillis() - currentStateOutput
+              .getEndTime(resourceName, partition, instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY)
+              && staleMessage.getResourceName().equals(resourceName) && (
+              staleMessage.getPartitionName().equals(partition.getPartitionName()) || (
+                  staleMessage.getBatchMessageMode() && staleMessage.getPartitionNames()
+                      .contains(partition.getPartitionName())))) {
             logAndAddToCleanUp(messagesToCleanUp, staleMessage, instanceName, resourceName,
                 partition, currentState, STALE_MESSAGE);
           }
@@ -229,9 +233,9 @@ public class MessageGenerationPhase extends AbstractBaseStage {
 
           if (pendingMessage != null) {
             message =
-                generateCancellationMessageForPendingMessage(desiredState, currentState, nextState, pendingMessage,
-                    manager, resource, partition, sessionIdMap, instanceName, stateModelDef,
-                    cancellationMessage, isCancellationEnabled);
+                generateCancellationMessageForPendingMessage(desiredState, currentState, nextState,
+                    pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
+                    stateModelDef, cancellationMessage, isCancellationEnabled);
           } else {
             // Create new state transition message
             message = MessageUtil
@@ -284,9 +288,9 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     // Cancel the ST except below scenarios:
     // 1. pending message toState is desired state
     // 2. pending message is an ERROR reset: ERROR -> initState (eg. OFFLINE)
-    return !desiredState.equalsIgnoreCase(pendingMessage.getToState())
-        && !(HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState())
-        && initialState.equals(pendingMessage.getToState()));
+    return !desiredState.equalsIgnoreCase(pendingMessage.getToState()) && !(
+        HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState()) && initialState
+            .equals(pendingMessage.getToState()));
   }
 
   private void logAndAddToCleanUp(Map<String, Map<String, Message>> messagesToCleanUp,
@@ -303,11 +307,12 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     messagesToCleanUp.get(instanceName).put(message.getMsgId(), message);
   }
 
-  private Message generateCancellationMessageForPendingMessage(final String desiredState, final String currentState,
-      final String nextState, final Message pendingMessage, final HelixManager manager,
-      final Resource resource, final Partition partition, final Map<String, String> sessionIdMap,
-      final String instanceName, final StateModelDefinition stateModelDef,
-      final Message cancellationMessage, final boolean isCancellationEnabled) {
+  private Message generateCancellationMessageForPendingMessage(final String desiredState,
+      final String currentState, final String nextState, final Message pendingMessage,
+      final HelixManager manager, final Resource resource, final Partition partition,
+      final Map<String, String> sessionIdMap, final String instanceName,
+      final StateModelDefinition stateModelDef, final Message cancellationMessage,
+      final boolean isCancellationEnabled) {
 
     Message message = null;
 
@@ -316,19 +321,20 @@ public class MessageGenerationPhase extends AbstractBaseStage {
       if (nextState.equalsIgnoreCase(pendingState)) {
         LogUtil.logInfo(logger, _eventId,
             "Message already exists for " + instanceName + " to transit " + resource
-                .getResourceName() + "." + partition.getPartitionName() + " from "
-                + currentState + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
+                .getResourceName() + "." + partition.getPartitionName() + " from " + currentState
+                + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
       } else if (currentState.equalsIgnoreCase(pendingState)) {
         LogUtil.logInfo(logger, _eventId,
             "Message hasn't been removed for " + instanceName + " to transit " + resource
-                .getResourceName() + "." + partition.getPartitionName() + " to "
-                + pendingState + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage.isRelayMessage());
+                .getResourceName() + "." + partition.getPartitionName() + " to " + pendingState
+                + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage
+                .isRelayMessage());
       } else {
         LogUtil.logInfo(logger, _eventId,
-            "IdealState changed before state transition completes for " + resource
-                .getResourceName() + "." + partition.getPartitionName() + " on "
-                + instanceName + ", pendingState: " + pendingState + ", currentState: "
-                + currentState + ", nextState: " + nextState + ", isRelay: " + pendingMessage.isRelayMessage());
+            "IdealState changed before state transition completes for " + resource.getResourceName()
+                + "." + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
+                + pendingState + ", currentState: " + currentState + ", nextState: " + nextState
+                + ", isRelay: " + pendingMessage.isRelayMessage());
 
         message = MessageUtil.createStateTransitionCancellationMessage(manager.getInstanceName(),
             manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
@@ -353,8 +359,9 @@ public class MessageGenerationPhase extends AbstractBaseStage {
         }
       }
 
-      int timeout = getTimeOut(cache.getClusterConfig(), cache.getResourceConfig(resourceName),
-          currentState, nextState, idealState, partition);
+      int timeout =
+          getTimeOut(cache.getClusterConfig(), cache.getResourceConfig(resourceName), currentState,
+              nextState, idealState, partition);
       if (timeout > 0) {
         message.setExecutionTimeout(timeout);
       }