You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 18:00:05 UTC
[30/38] helix git commit: Add more messaging metrics to participant.
Add more messaging metrics to participant.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a294ab2d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a294ab2d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a294ab2d
Branch: refs/heads/helix-0.6.x
Commit: a294ab2dc61ba7afe089e64fc30b21fc375c9e29
Parents: 3f07be0
Author: Boyan Li <bo...@linkedin.com>
Authored: Mon Sep 19 11:42:11 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:54:03 2017 -0800
----------------------------------------------------------------------
.../helix/messaging/handling/HelixTask.java | 9 +++-
.../messaging/handling/HelixTaskExecutor.java | 11 ++--
.../monitoring/ParticipantStatusMonitor.java | 26 ++++++++-
.../mbeans/ParticipantMessageMonitor.java | 55 +++++++++++++++++++-
.../mbeans/ParticipantMessageMonitorMBean.java | 4 ++
5 files changed, 97 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index cc4123a..1aa932e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -35,6 +35,7 @@ import org.apache.helix.model.Message.Attributes;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;
@@ -100,6 +101,7 @@ public class HelixTask implements MessageTask {
+ " type: " + _message.getMsgType();
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
+ _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
}
// cancel timeout task
@@ -108,9 +110,10 @@ public class HelixTask implements MessageTask {
Exception exception = null;
try {
if (taskResult.isSuccess()) {
- _statusUpdateUtil.logInfo(_message, _handler.getClass(),
- "Message handling task completed successfully", accessor);
+ _statusUpdateUtil
+ .logInfo(_message, _handler.getClass(), "Message handling task completed successfully", accessor);
logger.info("Message " + _message.getMsgId() + " completed.");
+ _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
} else {
type = ErrorType.INTERNAL;
@@ -142,6 +145,7 @@ public class HelixTask implements MessageTask {
logger.error(errorMsg);
_statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
}
+ _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
}
if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
@@ -160,6 +164,7 @@ public class HelixTask implements MessageTask {
"Exception after executing a message, msgId: " + _message.getMsgId() + e;
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
+ _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
} finally {
long end = System.currentTimeMillis();
logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"
http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index ea3646d..d68b272 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -34,7 +34,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.Criteria;
import org.apache.helix.HelixConstants;
@@ -59,6 +58,7 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.monitoring.ParticipantStatusMonitor;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -595,7 +595,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
// Update message count
_messageQueueMonitor.setMessageQueueBacklog(messages.size());
- _monitor.incrementReceivedMessages(messages.size());
+ _monitor.reportReceivedMessages(messages);
// sort message by creation timestamp, so message created earlier is processed first
Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
@@ -623,6 +623,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: "
+ message.getMsgSrc());
accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+ _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
continue;
}
@@ -646,6 +647,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
syncSessionToController(manager);
}
}
+ _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
continue;
}
@@ -656,6 +658,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
List<LiveInstance> liveInstances = manager.getHelixDataAccessor().getChildValues(key);
_controller.onLiveInstanceChange(liveInstances, changeContext);
accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+ _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
continue;
}
@@ -668,6 +671,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
if (LOG.isTraceEnabled()) {
LOG.trace("Message already read. msgId: " + message.getMsgId());
}
+ _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
continue;
}
@@ -675,6 +679,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
try {
MessageHandler createHandler = createMessageHandler(message, changeContext);
if (createHandler == null) {
+ _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
continue;
}
handlers.add(createHandler);
@@ -688,7 +693,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
message.setMsgState(MessageState.UNPROCESSABLE);
accessor.removeProperty(message.getKey(keyBuilder, instanceName));
LOG.error("Message cannot be processed: " + message.getRecord(), e);
-
+ _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
continue;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
index 31e8fb6..75ce14a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
@@ -20,6 +20,7 @@ package org.apache.helix.monitoring;
*/
import java.lang.management.ManagementFactory;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -27,6 +28,7 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.apache.helix.model.Message;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.monitoring.mbeans.StateTransitionStatMonitor;
import org.apache.log4j.Logger;
@@ -53,9 +55,29 @@ public class ParticipantStatusMonitor {
}
}
- public void incrementReceivedMessages(int num) {
+ public void reportReceivedMessages(List<Message> messages) {
if (_messageMonitor != null) { // is participant
- _messageMonitor.incrementReceivedMessages(num);
+ _messageMonitor.incrementReceivedMessages(messages.size());
+ _messageMonitor.incrementPendingMessages(messages.size());
+ }
+ }
+
+ public void reportProcessedMessage(Message message, ParticipantMessageMonitor.ProcessedMessageState processedMessageState) {
+ if (_messageMonitor != null) { // is participant
+ switch (processedMessageState) {
+ case DISCARDED:
+ _messageMonitor.incrementDiscardedMessages(1);
+ _messageMonitor.decrementPendingMessages(1);
+ break;
+ case FAILED:
+ _messageMonitor.incrementFailedMessages(1);
+ _messageMonitor.decrementPendingMessages(1);
+ break;
+ case COMPLETED:
+ _messageMonitor.incrementCompletedMessages(1);
+ _messageMonitor.decrementPendingMessages(1);
+ break;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
index d7d6fab..f1b5bec 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -2,10 +2,23 @@ package org.apache.helix.monitoring.mbeans;
public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean {
+ /**
+ * The current processed state of the message
+ */
+ public enum ProcessedMessageState {
+ DISCARDED,
+ FAILED,
+ COMPLETED
+ }
+
private static final String PARTICIPANT_KEY = "ParticipantName";
private static final String PARTICIPANT_STATUS_KEY = "ParticipantMessageStatus";
private final String _participantName;
private long _receivedMessages = 0;
+ private long _discardedMessages = 0;
+ private long _completedMessages = 0;
+ private long _failedMessages = 0;
+ private long _pendingMessages = 0;
public ParticipantMessageMonitor(String participantName) {
_participantName = participantName;
@@ -16,7 +29,27 @@ public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean
}
public void incrementReceivedMessages(int count) {
- _receivedMessages+=count;
+ _receivedMessages += count;
+ }
+
+ public void incrementDiscardedMessages(int count) {
+ _discardedMessages += count;
+ }
+
+ public void incrementCompletedMessages(int count) {
+ _completedMessages += count;
+ }
+
+ public void incrementFailedMessages(int count) {
+ _failedMessages += count;
+ }
+
+ public void incrementPendingMessages(int count) {
+ _pendingMessages += count;
+ }
+
+ public void decrementPendingMessages(int count) {
+ _pendingMessages -= count;
}
@Override
@@ -25,6 +58,26 @@ public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean
}
@Override
+ public long getDiscardedMessages() {
+ return _discardedMessages;
+ }
+
+ @Override
+ public long getCompletedMessages() {
+ return _completedMessages;
+ }
+
+ @Override
+ public long getFailedMessages() {
+ return _failedMessages;
+ }
+
+ @Override
+ public long getPendingMessages() {
+ return _pendingMessages;
+ }
+
+ @Override
public String getSensorName() {
return PARTICIPANT_STATUS_KEY + "." + "_participantName";
}
http://git-wip-us.apache.org/repos/asf/helix/blob/a294ab2d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
index cfbc755..c7d4346 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
@@ -5,4 +5,8 @@ import org.apache.helix.monitoring.SensorNameProvider;
public interface ParticipantMessageMonitorMBean extends SensorNameProvider {
public long getReceivedMessages();
+ public long getDiscardedMessages();
+ public long getCompletedMessages();
+ public long getFailedMessages();
+ public long getPendingMessages();
}
\ No newline at end of file