You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/10/27 22:40:59 UTC

[GitHub] [helix] jiajunwang opened a new pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

jiajunwang opened a new pull request #1489:
URL: https://github.com/apache/helix/pull/1489


   ### Issues
   
   - [X] My PR addresses the following Helix issues and references them in the PR description:
   
   This PR is related to #1488 given we need to clean up the related code so as to make the further fix. But it won't address the issue.
   
   ### Description
   
   - [X] Here are some details about my PR, including screenshots of any UI changes:
   
   Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.
   There is a minor code logic change for optimization. But there is no business logic change in this PR.
   
   ### Tests
   
   - [X] The following tests are written for this issue:
   
   NA
   
   - [ ] The following is the result of the "mvn test" command on the appropriate module:
   
   running...
   
   ### Documentation (Optional)
   
   - In case of new functionality, my PR adds documentation in the following wiki page:
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513935472



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
+      NotificationContext changeContext, HelixManager manager, String sessionId,
+      Map<String, MessageHandler> stateTransitionHandlers) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();

Review comment:
       If this fails, meaning we lost ZK connection and the wait is timed out. In this case, I think it would be better to throw the exception instead of proceeding. The handleUnprocessableMessage will fail too due to ZK connection breaks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
dasahcc commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513783040



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -192,43 +191,38 @@ public void registerMessageHandlerFactory(String type, MessageHandlerFactory fac
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
       int threadpoolSize) {
-    if (factory instanceof  MultiTypeMessageHandlerFactory) {
+    if (factory instanceof MultiTypeMessageHandlerFactory) {
       if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
         throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
             + ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
       }
     } else {
       if (!factory.getMessageType().equals(type)) {
         throw new HelixException(
-            "Message factory type mismatch. Type: " + type + ", factory: " + factory.getMessageType());
+            "Message factory type mismatch. Type: " + type + ", factory: " + factory
+                .getMessageType());
       }
     }
 
     _isShuttingDown = false;
 
-    MsgHandlerFactoryRegistryItem newItem = new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
+    MsgHandlerFactoryRegistryItem newItem =
+        new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
     MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem);
     if (prevItem == null) {
-      ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize, new ThreadFactory() {
-        @Override public Thread newThread(Runnable r) {
-          return new Thread(r, "HelixTaskExecutor-message_handle_thread_" + thread_uid.getAndIncrement());
-        }
-      });
-      ExecutorService prevExecutor = _executorMap.putIfAbsent(type, newPool);
-      if (prevExecutor != null) {
-        LOG.warn("Skip creating a new thread pool for type: " + type + ", already existing pool: "
-            + prevExecutor + ", isShutdown: " + prevExecutor.isShutdown());
-        newPool.shutdown();
-        newPool = null;
-      } else {
+      _executorMap.computeIfAbsent(type, msgType -> {

Review comment:
       We can use putIfAbsent since we are not using the return value anymore here.

##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkForNoOpMessage(Message message, String instanceName,

Review comment:
       nit: better to name it as isNoOpMessage? The word check could be confusing here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513732287



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -192,43 +191,38 @@ public void registerMessageHandlerFactory(String type, MessageHandlerFactory fac
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
       int threadpoolSize) {
-    if (factory instanceof  MultiTypeMessageHandlerFactory) {
+    if (factory instanceof MultiTypeMessageHandlerFactory) {
       if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
         throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
             + ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
       }
     } else {
       if (!factory.getMessageType().equals(type)) {
         throw new HelixException(
-            "Message factory type mismatch. Type: " + type + ", factory: " + factory.getMessageType());
+            "Message factory type mismatch. Type: " + type + ", factory: " + factory
+                .getMessageType());

Review comment:
       This PR is all about non-essential changes that are the preparation for the real business logic changes.
   Non-essential does not mean we are not going to do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513809794



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkForNoOpMessage(Message message, String instanceName,

Review comment:
       This method is now also cleaning up the message, and it is not easy to move the logic out. So I prefer not calling it as an isXXXX method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r514552521



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
+      NotificationContext changeContext, HelixManager manager, String sessionId,
+      Map<String, MessageHandler> stateTransitionHandlers) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();

Review comment:
       This specific case won't cause retry, since the message won't be removed when zkclient is down.
   statusUpdateUtil is a different story. It should be considered in different PRs.
   
   Based on what we discussed today with @dasahcc and @lei-xia , we agreed that it is not ideal or applicable for us to eliminate all potential Exceptions. And leave the message in any of the Exception cases is dangerous, IMO.
   So we will take this case by case.
   
   In the following PRs, we will ensure all the Exceptions of application code (behind the interfaces) will be handled gracefully. But our internal logic exceptions will be evaluated one by one in other PRs, if necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513811045



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -810,164 +799,52 @@ public void onMessage(String instanceName, List<Message> messages,
     Set<String> createCurStateNames = new HashSet<>();
 
     for (Message message : messages) {
-      try {
-        // nop messages are simply removed. It is used to trigger onMessage() in
-        // situations such as register a new message handler factory
-        if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) {
-          LOG.info(
-              "Dropping NO-OP message. mid: " + message.getId() + ", from: " + message.getMsgSrc());
-          reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
-          continue;
-        }
-
-        String tgtSessionId = message.getTgtSessionId();
-        // sessionId mismatch normally means message comes from expired session, just remove it
-        if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
-          String warningMessage =
-              "SessionId does NOT match. expected sessionId: " + sessionId
-                  + ", tgtSessionId in message: " + tgtSessionId + ", messageId: "
-                  + message.getMsgId();
-          LOG.warn(warningMessage);
-          reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
-          _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, manager);
-
-          // Proactively send a session sync message from participant to controller
-          // upon session mismatch after a new session is established
-          if (manager.getInstanceType() == InstanceType.PARTICIPANT
-              || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
-            if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
-              syncSessionToController(manager);
-            }
-          }
-          continue;
-        }
-
-        if ((manager.getInstanceType() == InstanceType.CONTROLLER
-            || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
-            && MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
-          LOG.info(String.format("Controller received PARTICIPANT_SESSION_CHANGE msg from src: %s",
-              message.getMsgSrc()));
-          PropertyKey key = new Builder(manager.getClusterName()).liveInstances();
-          List<LiveInstance> liveInstances =
-              manager.getHelixDataAccessor().getChildValues(key, true);
-          _controller.onLiveInstanceChange(liveInstances, changeContext);
-          reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.COMPLETED);
-          continue;
-        }
-
-        // don't process message that is of READ or UNPROCESSABLE state
-        if (MessageState.NEW != message.getMsgState()) {
-          // It happens because we don't delete message right after
-          // read. Instead we keep it until the current state is updated.
-          // We will read the message again if there is a new message but we
-          // check for the status and ignore if its already read
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Message already read. msgId: " + message.getMsgId());
-          }
-          continue;
-        }
-
-        if (message.isExpired()) {
-          LOG.info(
-              "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc()
-                  + " relayed from: " + message.getRelaySrcHost());
-          reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
-          continue;
-        }
-
-        // State Transition Cancellation
-        if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
-          boolean success = cancelNotStartedStateTransition(message, stateTransitionHandlers, accessor, instanceName);
-          if (success) {
-            continue;
-          }
-        }
-
-        _monitor.reportReceivedMessage(message);
-      } catch (Exception e) {
-        LOG.error("Failed to process the message {}. Deleting the message from ZK. Exception: {}",
-            message, e);
-        removeMessageFromTaskAndFutureMap(message);
-        removeMessageFromZK(accessor, message, instanceName);
+      if (checkForNoOpMessage(message, instanceName, changeContext, manager, sessionId,
+          stateTransitionHandlers)) {
+        // skip the following operations for the no-op messages.
         continue;
       }
-
       // create message handlers, if handlers not found, leave its state as NEW
       NotificationContext msgWorkingContext = changeContext.clone();
       try {
-        MessageHandler createHandler = createMessageHandler(message, msgWorkingContext);
-        if (createHandler == null) {
+        MessageHandler msgHandler = createMessageHandler(message, msgWorkingContext);
+        if (msgHandler == null) {
+          // Failed to create message handler, skip processing this message in this callback.
+          // The same message process will be retried in the next round.
           continue;
         }
         if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType()
             .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
-          String messageTarget =
-              getMessageTarget(message.getResourceName(), message.getPartitionName());
-
-          if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
-              && isStateTransitionInProgress(messageTarget)) {
-
-            String taskId = _messageTaskMap.get(messageTarget);
-            Message msg = _taskMap.get(taskId).getTask().getMessage();
-
-            // If there is another state transition for same partition is going on,
-            // discard the message. Controller will resend if this is a valid message
-            String errMsg = String.format(
-                "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), msg.getMsgId(),
-                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
-                System.currentTimeMillis(), message.getFromState(), message.getToState());
-            handleUnprocessableMessage(message, null /* exception */, errMsg, accessor,
-                instanceName, manager);
-            continue;
-          }
-          if (createHandler instanceof HelixStateTransitionHandler) {
-            // We only check to state if there is no ST task scheduled/executing.
-            HelixStateTransitionHandler.StaleMessageValidateResult result =
-                ((HelixStateTransitionHandler) createHandler).staleMessageValidator();
-            if (!result.isValid) {
-              handleUnprocessableMessage(message, null /* exception */,
-                  result.exception.getMessage(), accessor, instanceName, manager);
-              continue;
-            }
-          }
-          if (stateTransitionHandlers.containsKey(messageTarget)) {
-            // If there are 2 messages in same batch about same partition's state transition,
-            // the later one is discarded
-            Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
-            String errMsg = String.format(
-                "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
-                message.getMsgId(), duplicatedMessage.getFromState(),
-                duplicatedMessage.getToState(), message.getFromState(), message.getToState());
-            handleUnprocessableMessage(message, null /* exception */, errMsg, accessor,
-                instanceName, manager);
+          if (validateStateTransitionMessage(message, instanceName, manager,
+              stateTransitionHandlers, msgHandler)) {
+            // Need future process by triggering state transition
+            String msgTarget =
+                getMessageTarget(message.getResourceName(), message.getPartitionName());
+            stateTransitionHandlers.put(msgTarget, msgHandler);
+            stateTransitionContexts.put(msgTarget, msgWorkingContext);
+          } else {
+            // skip the following operations for the invalid/expired state transition messages.
             continue;
           }
-
-          stateTransitionHandlers
-              .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
-                  createHandler);
-          stateTransitionContexts
-              .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
-                  msgWorkingContext);
         } else {
-          nonStateTransitionHandlers.add(createHandler);
+          // Need future process non state transition messages by triggering the handler
+          nonStateTransitionHandlers.add(msgHandler);
           nonStateTransitionContexts.add(msgWorkingContext);
         }
       } catch (Exception e) {
         handleUnprocessableMessage(message, e, e.getMessage(), accessor, instanceName, manager);
         continue;
       }
 
-      markReadMessage(message, msgWorkingContext, manager);
-      readMsgs.add(message);
-

Review comment:
       Nothing being removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] kaisun2000 commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
kaisun2000 commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513794200



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkForNoOpMessage(Message message, String instanceName,

Review comment:
       This is not just check noOp, it also has other functionalities, check message expired or cancellation logic. Shall we change the name of this one?
   
   More importantly, any exception thrown out here without statemodel marked as ERROR can cause controller sending another message repeatedly. Can you this invariant to the comment? So later if someone not familiar with this code can understand the intention here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513942183



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
+      NotificationContext changeContext, HelixManager manager, String sessionId,
+      Map<String, MessageHandler> stateTransitionHandlers) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();

Review comment:
       One more point why we should not change it, moving this line into the try-catch block changes the business logic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang merged pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang merged pull request #1489:
URL: https://github.com/apache/helix/pull/1489


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513732287



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -192,43 +191,38 @@ public void registerMessageHandlerFactory(String type, MessageHandlerFactory fac
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
       int threadpoolSize) {
-    if (factory instanceof  MultiTypeMessageHandlerFactory) {
+    if (factory instanceof MultiTypeMessageHandlerFactory) {
       if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
         throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
             + ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
       }
     } else {
       if (!factory.getMessageType().equals(type)) {
         throw new HelixException(
-            "Message factory type mismatch. Type: " + type + ", factory: " + factory.getMessageType());
+            "Message factory type mismatch. Type: " + type + ", factory: " + factory
+                .getMessageType());

Review comment:
       This PR is all about non-essential changes that are the preparation for the real business logic changes later.
   Non-essential does not mean we are not going to do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] kaisun2000 commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
kaisun2000 commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513842032



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkForNoOpMessage(Message message, String instanceName,

Review comment:
       Right now, the code is actually within a big try{} block. the catch block would remove message. I know in the next diff you will do more magic to make sure when this method exit, they must be a statemodel with some state. Currently, there is no logic error (probably the only thing is the first line of getHelixDataAcessor() is one is super picky). However, state this variant would help later developers to understand the reasoning here. So I think it would be a good idea to have a comment here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513814394



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkForNoOpMessage(Message message, String instanceName,

Review comment:
       Changed to checkAndProcessNoOpMessage




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] kaisun2000 commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
kaisun2000 commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513794632



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -810,164 +799,52 @@ public void onMessage(String instanceName, List<Message> messages,
     Set<String> createCurStateNames = new HashSet<>();
 
     for (Message message : messages) {
-      try {
-        // nop messages are simply removed. It is used to trigger onMessage() in
-        // situations such as register a new message handler factory
-        if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) {
-          LOG.info(
-              "Dropping NO-OP message. mid: " + message.getId() + ", from: " + message.getMsgSrc());
-          reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
-          continue;
-        }
-
-        String tgtSessionId = message.getTgtSessionId();
-        // sessionId mismatch normally means message comes from expired session, just remove it
-        if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
-          String warningMessage =
-              "SessionId does NOT match. expected sessionId: " + sessionId
-                  + ", tgtSessionId in message: " + tgtSessionId + ", messageId: "
-                  + message.getMsgId();
-          LOG.warn(warningMessage);
-          reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
-          _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, manager);
-
-          // Proactively send a session sync message from participant to controller
-          // upon session mismatch after a new session is established
-          if (manager.getInstanceType() == InstanceType.PARTICIPANT
-              || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
-            if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
-              syncSessionToController(manager);
-            }
-          }
-          continue;
-        }
-
-        if ((manager.getInstanceType() == InstanceType.CONTROLLER
-            || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
-            && MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
-          LOG.info(String.format("Controller received PARTICIPANT_SESSION_CHANGE msg from src: %s",
-              message.getMsgSrc()));
-          PropertyKey key = new Builder(manager.getClusterName()).liveInstances();
-          List<LiveInstance> liveInstances =
-              manager.getHelixDataAccessor().getChildValues(key, true);
-          _controller.onLiveInstanceChange(liveInstances, changeContext);
-          reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.COMPLETED);
-          continue;
-        }
-
-        // don't process message that is of READ or UNPROCESSABLE state
-        if (MessageState.NEW != message.getMsgState()) {
-          // It happens because we don't delete message right after
-          // read. Instead we keep it until the current state is updated.
-          // We will read the message again if there is a new message but we
-          // check for the status and ignore if its already read
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Message already read. msgId: " + message.getMsgId());
-          }
-          continue;
-        }
-
-        if (message.isExpired()) {
-          LOG.info(
-              "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc()
-                  + " relayed from: " + message.getRelaySrcHost());
-          reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
-          continue;
-        }
-
-        // State Transition Cancellation
-        if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
-          boolean success = cancelNotStartedStateTransition(message, stateTransitionHandlers, accessor, instanceName);
-          if (success) {
-            continue;
-          }
-        }
-
-        _monitor.reportReceivedMessage(message);
-      } catch (Exception e) {
-        LOG.error("Failed to process the message {}. Deleting the message from ZK. Exception: {}",
-            message, e);
-        removeMessageFromTaskAndFutureMap(message);
-        removeMessageFromZK(accessor, message, instanceName);
+      if (checkForNoOpMessage(message, instanceName, changeContext, manager, sessionId,
+          stateTransitionHandlers)) {
+        // skip the following operations for the no-op messages.
         continue;
       }
-
       // create message handlers, if handlers not found, leave its state as NEW
       NotificationContext msgWorkingContext = changeContext.clone();
       try {
-        MessageHandler createHandler = createMessageHandler(message, msgWorkingContext);
-        if (createHandler == null) {
+        MessageHandler msgHandler = createMessageHandler(message, msgWorkingContext);
+        if (msgHandler == null) {
+          // Failed to create message handler, skip processing this message in this callback.
+          // The same message process will be retried in the next round.
           continue;
         }
         if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType()
             .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
-          String messageTarget =
-              getMessageTarget(message.getResourceName(), message.getPartitionName());
-
-          if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
-              && isStateTransitionInProgress(messageTarget)) {
-
-            String taskId = _messageTaskMap.get(messageTarget);
-            Message msg = _taskMap.get(taskId).getTask().getMessage();
-
-            // If there is another state transition for same partition is going on,
-            // discard the message. Controller will resend if this is a valid message
-            String errMsg = String.format(
-                "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), msg.getMsgId(),
-                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
-                System.currentTimeMillis(), message.getFromState(), message.getToState());
-            handleUnprocessableMessage(message, null /* exception */, errMsg, accessor,
-                instanceName, manager);
-            continue;
-          }
-          if (createHandler instanceof HelixStateTransitionHandler) {
-            // We only check to state if there is no ST task scheduled/executing.
-            HelixStateTransitionHandler.StaleMessageValidateResult result =
-                ((HelixStateTransitionHandler) createHandler).staleMessageValidator();
-            if (!result.isValid) {
-              handleUnprocessableMessage(message, null /* exception */,
-                  result.exception.getMessage(), accessor, instanceName, manager);
-              continue;
-            }
-          }
-          if (stateTransitionHandlers.containsKey(messageTarget)) {
-            // If there are 2 messages in same batch about same partition's state transition,
-            // the later one is discarded
-            Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
-            String errMsg = String.format(
-                "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
-                message.getMsgId(), duplicatedMessage.getFromState(),
-                duplicatedMessage.getToState(), message.getFromState(), message.getToState());
-            handleUnprocessableMessage(message, null /* exception */, errMsg, accessor,
-                instanceName, manager);
+          if (validateStateTransitionMessage(message, instanceName, manager,
+              stateTransitionHandlers, msgHandler)) {
+            // Need future process by triggering state transition
+            String msgTarget =
+                getMessageTarget(message.getResourceName(), message.getPartitionName());
+            stateTransitionHandlers.put(msgTarget, msgHandler);
+            stateTransitionContexts.put(msgTarget, msgWorkingContext);
+          } else {
+            // skip the following operations for the invalid/expired state transition messages.
             continue;
           }
-
-          stateTransitionHandlers
-              .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
-                  createHandler);
-          stateTransitionContexts
-              .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
-                  msgWorkingContext);
         } else {
-          nonStateTransitionHandlers.add(createHandler);
+          // Need future process non state transition messages by triggering the handler
+          nonStateTransitionHandlers.add(msgHandler);
           nonStateTransitionContexts.add(msgWorkingContext);
         }
       } catch (Exception e) {
         handleUnprocessableMessage(message, e, e.getMessage(), accessor, instanceName, manager);
         continue;
       }
 
-      markReadMessage(message, msgWorkingContext, manager);
-      readMsgs.add(message);
-

Review comment:
       why remove these two lines? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513941491



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkForNoOpMessage(Message message, String instanceName,

Review comment:
       Depends on the solution, we might need to comment on this logic from a different perspective. I'm not convinced by now that we shall return the statemodel. And the logic here is not only for state transition messages only. So comment here will introduce more confusion.
   I prefer to make the existing logic clean enough first. I will add a comment if necessary when on the real fix. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513810797



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkForNoOpMessage(Message message, String instanceName,

Review comment:
       Any suggestion regarding the name? I feel "checkAndProcessNoOpMessage" is too long. But maybe that's what we need.
   
   Regarding your 2nd comment, could you please point out the potential code that throws Exception? I will take care of that in the following PR. We won't change any business logic in this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] kaisun2000 commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
kaisun2000 commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r514547303



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
+      NotificationContext changeContext, HelixManager manager, String sessionId,
+      Map<String, MessageHandler> stateTransitionHandlers) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();

Review comment:
       ```
     public HelixDataAccessor getHelixDataAccessor() {
       checkConnected(_waitForConnectedTimeout);
       return _dataAccessor;
     }
   ```
   
   if somehow the _zkClient is closed, the above code would throw exception, which would cause the same issue of repeatedly controller sending messages.
   
   Note, with new change to statusUpdateUtil, we won't really use _zkClient from ZKHelixManager to write to Zookeeper. 
   
   In sum, I think the core idea (invariant) is that don't throw anything out from this method. Anyway, up to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] kaisun2000 commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
kaisun2000 commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513854038



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -997,30 +873,186 @@ public void onMessage(String instanceName, List<Message> messages,
       }
     }
 
-    // update message state to READ in batch and schedule all read messages
+    // update message state to READ in batch and schedule tasks for all read messages
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
-        }
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
-        Message msg = handler._message;
-        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
-          removeMessageFromTaskAndFutureMap(msg);
-          removeMessageFromZK(accessor, msg, instanceName);
+        scheduleTaskForMessage(instanceName, accessor, handler, context);
+      }
+    }
+  }
+
+  /**
+   * Inspect the message. Report and remove it if no operation needs to be done.
+   * @param message
+   * @param instanceName
+   * @param changeContext
+   * @param manager
+   * @param sessionId
+   * @param stateTransitionHandlers
+   * @return True if the message is no-op message and no other process step is required.
+   */
+  private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
+      NotificationContext changeContext, HelixManager manager, String sessionId,
+      Map<String, MessageHandler> stateTransitionHandlers) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();

Review comment:
       nit, put this line also into try{} block.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on pull request #1489:
URL: https://github.com/apache/helix/pull/1489#issuecomment-719008976


   Discussed with the team about the roadmap, the remaining comment does not fit the scope of this work. And the specific case mentioned in this PR won't cause trouble. So I will move forward with Kai's approval.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513809494



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -192,43 +191,38 @@ public void registerMessageHandlerFactory(String type, MessageHandlerFactory fac
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
       int threadpoolSize) {
-    if (factory instanceof  MultiTypeMessageHandlerFactory) {
+    if (factory instanceof MultiTypeMessageHandlerFactory) {
       if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
         throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
             + ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
       }
     } else {
       if (!factory.getMessageType().equals(type)) {
         throw new HelixException(
-            "Message factory type mismatch. Type: " + type + ", factory: " + factory.getMessageType());
+            "Message factory type mismatch. Type: " + type + ", factory: " + factory
+                .getMessageType());
       }
     }
 
     _isShuttingDown = false;
 
-    MsgHandlerFactoryRegistryItem newItem = new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
+    MsgHandlerFactoryRegistryItem newItem =
+        new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
     MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem);
     if (prevItem == null) {
-      ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize, new ThreadFactory() {
-        @Override public Thread newThread(Runnable r) {
-          return new Thread(r, "HelixTaskExecutor-message_handle_thread_" + thread_uid.getAndIncrement());
-        }
-      });
-      ExecutorService prevExecutor = _executorMap.putIfAbsent(type, newPool);
-      if (prevExecutor != null) {
-        LOG.warn("Skip creating a new thread pool for type: " + type + ", already existing pool: "
-            + prevExecutor + ", isShutdown: " + prevExecutor.isShutdown());
-        newPool.shutdown();
-        newPool = null;
-      } else {
+      _executorMap.computeIfAbsent(type, msgType -> {

Review comment:
       putIfAbsent is not good since you will need to create the new pool, check, then close the new pool if there is already an existing one.
   This is what I optimized here.
   
   computeIfAbsent won't trigger the compute callback if the key already exists.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] kaisun2000 commented on a change in pull request #1489: Refine the HelixTaskExecutor to reduce duplicate code and clarify the code structure.

Posted by GitBox <gi...@apache.org>.
kaisun2000 commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513731065



##########
File path: helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -192,43 +191,38 @@ public void registerMessageHandlerFactory(String type, MessageHandlerFactory fac
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
       int threadpoolSize) {
-    if (factory instanceof  MultiTypeMessageHandlerFactory) {
+    if (factory instanceof MultiTypeMessageHandlerFactory) {
       if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
         throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
             + ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
       }
     } else {
       if (!factory.getMessageType().equals(type)) {
         throw new HelixException(
-            "Message factory type mismatch. Type: " + type + ", factory: " + factory.getMessageType());
+            "Message factory type mismatch. Type: " + type + ", factory: " + factory
+                .getMessageType());

Review comment:
       All the above changes seems to be non-essential change, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org