You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:50 UTC
[15/38] helix git commit: Add a mbean for participant and emit
received msgs
Add a mbean for participant and emit received msgs
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/821fd04d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/821fd04d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/821fd04d
Branch: refs/heads/helix-0.6.x
Commit: 821fd04dfb1dc66a85d0dfb76e5837a4c8107448
Parents: a1278e1
Author: Boyan Li <bo...@linkedin.com>
Authored: Mon Aug 29 09:47:27 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sun Feb 5 19:18:14 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/HelixManager.java | 6 +++
.../apache/helix/manager/zk/ZKHelixManager.java | 15 +++++-
.../messaging/DefaultMessagingService.java | 2 +-
.../messaging/handling/HelixTaskExecutor.java | 8 ++-
.../mbeans/ParticipantStatusMonitor.java | 54 ++++++++++++++++++++
.../mbeans/ParticipantStatusMonitorMBean.java | 8 +++
.../src/test/java/org/apache/helix/Mocks.java | 5 ++
.../controller/stages/DummyClusterManager.java | 5 ++
.../helix/participant/MockZKHelixManager.java | 5 ++
9 files changed, 104 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 7b574aa..76db004 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -25,6 +25,7 @@ import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.spectator.RoutingTableProvider;
@@ -202,6 +203,11 @@ public interface HelixManager {
String getSessionId();
/**
+ * Get the ParticipantStatusMonitor.
+ * @return the ParticipantStatusMonitor
+ */
+ ParticipantStatusMonitor getParticipantStatusMonitor();
+ /**
* The time stamp is always updated when a notification is received. This can
* be used to check if there was any new notification when previous
* notification was being processed. This is updated based on the
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index f9d03c3..f15725a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -67,6 +67,7 @@ import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.AutoFallbackPropertyStore;
@@ -123,7 +124,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
*/
private final StateMachineEngine _stateMachineEngine;
private final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
-
+ private ParticipantStatusMonitor _participantStatusMonitor;
private final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
private Long _sessionStartTime;
@@ -200,7 +201,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_version = _properties.getVersion();
_keyBuilder = new Builder(clusterName);
- _messagingService = new DefaultMessagingService(this);
/**
* use system property if available
@@ -222,6 +222,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
switch (instanceType) {
case PARTICIPANT:
_stateMachineEngine = new HelixStateMachineEngine(this);
+ _participantStatusMonitor = new ParticipantStatusMonitor(_instanceName);
_participantHealthInfoCollector =
new ParticipantHealthReportCollectorImpl(this, _instanceName);
_timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
@@ -234,6 +235,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
break;
case CONTROLLER_PARTICIPANT:
_stateMachineEngine = new HelixStateMachineEngine(this);
+ _participantStatusMonitor = new ParticipantStatusMonitor(_instanceName);
_participantHealthInfoCollector =
new ParticipantHealthReportCollectorImpl(this, _instanceName);
@@ -248,6 +250,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
default:
throw new IllegalArgumentException("unrecognized type: " + instanceType);
}
+ // DefaultMessagingService has to be initialized after instance type specific init,
+ // because it depends on ParticipantStatusMonitor
+ _messagingService = new DefaultMessagingService(this);
+
}
private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
@@ -590,6 +596,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
@Override
+ public ParticipantStatusMonitor getParticipantStatusMonitor() {
+ return _participantStatusMonitor;
+ }
+
+ @Override
public boolean isConnected() {
if (_zkclient == null) {
return false;
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index f000f69..1a78cb8 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -56,7 +56,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
public DefaultMessagingService(HelixManager manager) {
_manager = manager;
_evaluator = new CriteriaEvaluator();
- _taskExecutor = new HelixTaskExecutor(this);
+ _taskExecutor = new HelixTaskExecutor(this, manager.getParticipantStatusMonitor());
_asyncCallbackService = new AsyncCallbackService();
_taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
_asyncCallbackService);
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 7a1210f..2bb8435 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -59,6 +59,7 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.monitoring.ParticipantMonitor;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -110,6 +111,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
private MessageQueueMonitor _messageQueueMonitor;
private ClusterMessagingService _messagingService;
+ private ParticipantStatusMonitor _participantStatusMonitor;
private GenericHelixController _controller;
private Long _lastSessionSyncTime;
private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
@@ -144,9 +146,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
startMonitorThread();
}
- public HelixTaskExecutor(ClusterMessagingService messagingService) {
+ public HelixTaskExecutor(ClusterMessagingService messagingService, ParticipantStatusMonitor participantStatusMonitor) {
this();
_messagingService = messagingService;
+ _participantStatusMonitor = participantStatusMonitor;
}
@Override
@@ -597,6 +600,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
// Update message count
_messageQueueMonitor.setMessageQueueBacklog(messages.size());
+ if (_participantStatusMonitor != null) {
+ _participantStatusMonitor.incrementReceivedMessages(messages.size());
+ }
// sort message by creation timestamp, so message created earlier is processed first
Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
new file mode 100644
index 0000000..e7c6cab
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -0,0 +1,54 @@
+package org.apache.helix.monitoring.mbeans;
+
+import java.lang.management.ManagementFactory;
+
+import org.apache.log4j.Logger;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+
+public class ParticipantStatusMonitor implements ParticipantStatusMonitorMBean {
+ private static final Logger LOG = Logger.getLogger(ParticipantStatusMonitor.class);
+ private static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
+ private static final String PARTICIPANT_KEY = "ParticipantName";
+ private final MBeanServer _beanServer;
+ private final String _participantName;
+
+ private long _receivedMessages = 0;
+
+ public ParticipantStatusMonitor(String participantName) {
+ _participantName = participantName;
+ _beanServer = ManagementFactory.getPlatformMBeanServer();
+
+ try {
+ LOG.info("Register MBean for participant: " + participantName);
+ _beanServer.registerMBean(this, getObjectName(getParticipantBeanName()));
+ } catch (Exception e) {
+ LOG.error("Could not register MBean for : " + participantName, e);
+ }
+ }
+
+ @Override
+ public long getReceivedMessages() {
+ return _receivedMessages;
+ }
+
+ @Override
+ public String getSensorName() {
+ return PARTICIPANT_STATUS_KEY + "." + _participantName;
+ }
+
+ public ObjectName getObjectName(String name) throws MalformedObjectNameException {
+ return new ObjectName(String.format("%s: %s", PARTICIPANT_STATUS_KEY, name));
+ }
+
+ private String getParticipantBeanName() {
+ return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
+ }
+
+ public void incrementReceivedMessages(int count) {
+ _receivedMessages+=count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java
new file mode 100644
index 0000000..33f1680
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitorMBean.java
@@ -0,0 +1,8 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+
+public interface ParticipantStatusMonitorMBean extends SensorNameProvider {
+ public long getReceivedMessages();
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 9688c0d..d705503 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -43,6 +43,7 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
@@ -462,6 +463,10 @@ public class Mocks {
return 0L;
}
+ @Override
+ public ParticipantStatusMonitor getParticipantStatusMonitor() {
+ return null;
+ }
}
public static class MockAccessor implements HelixDataAccessor {
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 0167487..becbb09 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -42,6 +42,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -259,4 +260,8 @@ public class DummyClusterManager implements HelixManager {
return 0L;
}
+ @Override
+ public ParticipantStatusMonitor getParticipantStatusMonitor() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/821fd04d/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index db6974e..4a97ba5 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -48,6 +48,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
public class MockZKHelixManager implements HelixManager {
@@ -265,4 +266,8 @@ public class MockZKHelixManager implements HelixManager {
return 0L;
}
+ @Override
+ public ParticipantStatusMonitor getParticipantStatusMonitor() {
+ return null;
+ }
}