You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 18:00:05 UTC

[30/38] helix git commit: Add more messaging metrics to participant.

Add more messaging metrics to participant.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a294ab2d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a294ab2d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a294ab2d

Branch: refs/heads/helix-0.6.x
Commit: a294ab2dc61ba7afe089e64fc30b21fc375c9e29
Parents: 3f07be0
Author: Boyan Li <bo...@linkedin.com>
Authored: Mon Sep 19 11:42:11 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:54:03 2017 -0800

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTask.java     |  9 +++-
 .../messaging/handling/HelixTaskExecutor.java   | 11 ++--
 .../monitoring/ParticipantStatusMonitor.java    | 26 ++++++++-
 .../mbeans/ParticipantMessageMonitor.java       | 55 +++++++++++++++++++-
 .../mbeans/ParticipantMessageMonitorMBean.java  |  4 ++
 5 files changed, 97 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index cc4123a..1aa932e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -35,6 +35,7 @@ import org.apache.helix.model.Message.Attributes;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.monitoring.StateTransitionContext;
 import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 
@@ -100,6 +101,7 @@ public class HelixTask implements MessageTask {
               + " type: " + _message.getMsgType();
       logger.error(errorMessage, e);
       _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
+      _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
     }
 
     // cancel timeout task
@@ -108,9 +110,10 @@ public class HelixTask implements MessageTask {
     Exception exception = null;
     try {
       if (taskResult.isSuccess()) {
-        _statusUpdateUtil.logInfo(_message, _handler.getClass(),
-            "Message handling task completed successfully", accessor);
+        _statusUpdateUtil
+            .logInfo(_message, _handler.getClass(), "Message handling task completed successfully", accessor);
         logger.info("Message " + _message.getMsgId() + " completed.");
+        _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
       } else {
         type = ErrorType.INTERNAL;
 
@@ -142,6 +145,7 @@ public class HelixTask implements MessageTask {
           logger.error(errorMsg);
           _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
         }
+        _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
       }
 
       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
@@ -160,6 +164,7 @@ public class HelixTask implements MessageTask {
           "Exception after executing a message, msgId: " + _message.getMsgId() + e;
       logger.error(errorMessage, e);
       _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
+      _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
     } finally {
       long end = System.currentTimeMillis();
       logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"

http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index ea3646d..d68b272 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -34,7 +34,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixConstants;
@@ -59,6 +58,7 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.monitoring.ParticipantStatusMonitor;
 import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -595,7 +595,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
     // Update message count
     _messageQueueMonitor.setMessageQueueBacklog(messages.size());
-    _monitor.incrementReceivedMessages(messages.size());
+    _monitor.reportReceivedMessages(messages);
 
     // sort message by creation timestamp, so message created earlier is processed first
     Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
@@ -623,6 +623,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: "
             + message.getMsgSrc());
         accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
         continue;
       }
 
@@ -646,6 +647,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
             syncSessionToController(manager);
           }
         }
+        _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
         continue;
       }
 
@@ -656,6 +658,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         List<LiveInstance> liveInstances = manager.getHelixDataAccessor().getChildValues(key);
         _controller.onLiveInstanceChange(liveInstances, changeContext);
         accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
         continue;
       }
 
@@ -668,6 +671,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         if (LOG.isTraceEnabled()) {
           LOG.trace("Message already read. msgId: " + message.getMsgId());
         }
+        _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
         continue;
       }
 
@@ -675,6 +679,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       try {
         MessageHandler createHandler = createMessageHandler(message, changeContext);
         if (createHandler == null) {
+          _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
           continue;
         }
         handlers.add(createHandler);
@@ -688,7 +693,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         message.setMsgState(MessageState.UNPROCESSABLE);
         accessor.removeProperty(message.getKey(keyBuilder, instanceName));
         LOG.error("Message cannot be processed: " + message.getRecord(), e);
-
+        _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
index 31e8fb6..75ce14a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
@@ -20,6 +20,7 @@ package org.apache.helix.monitoring;
  */
 
 import java.lang.management.ManagementFactory;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -27,6 +28,7 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
 import org.apache.helix.monitoring.mbeans.StateTransitionStatMonitor;
 import org.apache.log4j.Logger;
@@ -53,9 +55,29 @@ public class ParticipantStatusMonitor {
     }
   }
 
-  public void incrementReceivedMessages(int num) {
+  public void reportReceivedMessages(List<Message> messages) {
     if (_messageMonitor != null) {  // is participant
-      _messageMonitor.incrementReceivedMessages(num);
+      _messageMonitor.incrementReceivedMessages(messages.size());
+      _messageMonitor.incrementPendingMessages(messages.size());
+    }
+  }
+
+  public void reportProcessedMessage(Message message, ParticipantMessageMonitor.ProcessedMessageState processedMessageState) {
+    if (_messageMonitor != null) {  // is participant
+      switch (processedMessageState) {
+        case DISCARDED:
+          _messageMonitor.incrementDiscardedMessages(1);
+          _messageMonitor.decrementPendingMessages(1);
+          break;
+        case FAILED:
+          _messageMonitor.incrementFailedMessages(1);
+          _messageMonitor.decrementPendingMessages(1);
+          break;
+        case COMPLETED:
+          _messageMonitor.incrementCompletedMessages(1);
+          _messageMonitor.decrementPendingMessages(1);
+          break;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
index d7d6fab..f1b5bec 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -2,10 +2,23 @@ package org.apache.helix.monitoring.mbeans;
 
 public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean {
 
+  /**
+   * The current processed state of the message
+   */
+  public enum ProcessedMessageState {
+    DISCARDED,
+    FAILED,
+    COMPLETED
+  }
+
   private static final String PARTICIPANT_KEY = "ParticipantName";
   private static final String PARTICIPANT_STATUS_KEY = "ParticipantMessageStatus";
   private final String _participantName;
   private long _receivedMessages = 0;
+  private long _discardedMessages = 0;
+  private long _completedMessages = 0;
+  private long _failedMessages = 0;
+  private long _pendingMessages = 0;
 
   public ParticipantMessageMonitor(String participantName) {
     _participantName = participantName;
@@ -16,7 +29,27 @@ public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean
   }
 
   public void incrementReceivedMessages(int count) {
-    _receivedMessages+=count;
+    _receivedMessages += count;
+  }
+
+  public void incrementDiscardedMessages(int count) {
+    _discardedMessages += count;
+  }
+
+  public void incrementCompletedMessages(int count) {
+    _completedMessages += count;
+  }
+
+  public void incrementFailedMessages(int count) {
+    _failedMessages += count;
+  }
+
+  public void incrementPendingMessages(int count) {
+    _pendingMessages += count;
+  }
+
+  public void decrementPendingMessages(int count) {
+    _pendingMessages -= count;
   }
 
   @Override
@@ -25,6 +58,26 @@ public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean
   }
 
   @Override
+  public long getDiscardedMessages() {
+    return _discardedMessages;
+  }
+
+  @Override
+  public long getCompletedMessages() {
+    return _completedMessages;
+  }
+
+  @Override
+  public long getFailedMessages() {
+    return _failedMessages;
+  }
+
+  @Override
+  public long getPendingMessages() {
+    return _pendingMessages;
+  }
+
+  @Override
   public String getSensorName() {
     return PARTICIPANT_STATUS_KEY + "." + "_participantName";
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
index cfbc755..c7d4346 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
@@ -5,4 +5,8 @@ import org.apache.helix.monitoring.SensorNameProvider;
 
 public interface ParticipantMessageMonitorMBean extends SensorNameProvider {
   public long getReceivedMessages();
+  public long getDiscardedMessages();
+  public long getCompletedMessages();
+  public long getFailedMessages();
+  public long getPendingMessages();
 }
\ No newline at end of file