You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/11/21 10:56:48 UTC
ignite git commit: IGNITE-6868 Implement new JMX metrics for
TcpCommunicationSpi monitoring
Repository: ignite
Updated Branches:
refs/heads/master 0295518ba -> 58b504136
IGNITE-6868 Implement new JMX metrics for TcpCommunicationSpi monitoring
Signed-off-by: Anton Vinogradov <av...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58b50413
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58b50413
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58b50413
Branch: refs/heads/master
Commit: 58b50413622e0059f889f5df062a0d0169d0456f
Parents: 0295518
Author: Aleksey Plekhanov <Pl...@gmail.com>
Authored: Tue Nov 21 13:56:34 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 21 13:56:34 2017 +0300
----------------------------------------------------------------------
.../tcp/TcpCommunicationMetricsListener.java | 225 +++++++++++++++++++
.../communication/tcp/TcpCommunicationSpi.java | 101 ++++++---
.../tcp/TcpCommunicationSpiMBean.java | 33 +++
.../tcp/TcpCommunicationStatisticsTest.java | 201 +++++++++++++++++
.../IgniteSpiCommunicationSelfTestSuite.java | 3 +
5 files changed, 527 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
new file mode 100644
index 0000000..8981e17
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jsr166.LongAdder8;
+
+/**
+ * Statistics for {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}.
+ */
+public class TcpCommunicationMetricsListener implements GridNioMetricsListener{
+ /** Received messages count. */
+ private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
+
+ /** Sent messages count.*/
+ private final LongAdder8 sentMsgsCnt = new LongAdder8();
+
+ /** Received bytes count. */
+ private final LongAdder8 rcvdBytesCnt = new LongAdder8();
+
+ /** Sent bytes count.*/
+ private final LongAdder8 sentBytesCnt = new LongAdder8();
+
+ /** Counter factory. */
+ private static final Callable<LongAdder8> LONG_ADDER_FACTORY = new Callable<LongAdder8>() {
+ @Override public LongAdder8 call() {
+ return new LongAdder8();
+ }
+ };
+
+ /** Received messages count grouped by message type. */
+ private final ConcurrentMap<String, LongAdder8> rcvdMsgsCntByType = new ConcurrentHashMap<>();
+
+ /** Received messages count grouped by sender. */
+ private final ConcurrentMap<String, LongAdder8> rcvdMsgsCntByNode = new ConcurrentHashMap<>();
+
+ /** Sent messages count grouped by message type. */
+ private final ConcurrentMap<String, LongAdder8> sentMsgsCntByType = new ConcurrentHashMap<>();
+
+ /** Sent messages count grouped by receiver. */
+ private final ConcurrentMap<String, LongAdder8> sentMsgsCntByNode = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void onBytesSent(int bytesCnt) {
+ sentBytesCnt.add(bytesCnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onBytesReceived(int bytesCnt) {
+ rcvdBytesCnt.add(bytesCnt);
+ }
+
+ /**
+ * Collects statistics for message sent by SPI.
+ *
+ * @param msg Sent message.
+ * @param nodeId Receiver node id.
+ */
+ public void onMessageSent(Message msg, UUID nodeId) {
+ assert msg != null;
+ assert nodeId != null;
+
+ sentMsgsCnt.increment();
+
+ if (msg instanceof GridIoMessage)
+ msg = ((GridIoMessage)msg).message();
+
+ LongAdder8 cntByType = F.addIfAbsent(sentMsgsCntByType, msg.getClass().getSimpleName(), LONG_ADDER_FACTORY);
+ LongAdder8 cntByNode = F.addIfAbsent(sentMsgsCntByNode, nodeId.toString(), LONG_ADDER_FACTORY);
+
+ cntByType.increment();
+ cntByNode.increment();
+ }
+
+ /**
+ * Collects statistics for message received by SPI.
+ *
+ * @param msg Received message.
+ * @param nodeId Sender node id.
+ */
+ public void onMessageReceived(Message msg, UUID nodeId) {
+ assert msg != null;
+ assert nodeId != null;
+
+ rcvdMsgsCnt.increment();
+
+ if (msg instanceof GridIoMessage)
+ msg = ((GridIoMessage)msg).message();
+
+ LongAdder8 cntByType = F.addIfAbsent(rcvdMsgsCntByType, msg.getClass().getSimpleName(), LONG_ADDER_FACTORY);
+ LongAdder8 cntByNode = F.addIfAbsent(rcvdMsgsCntByNode, nodeId.toString(), LONG_ADDER_FACTORY);
+
+ cntByType.increment();
+ cntByNode.increment();
+ }
+
+ /**
+ * Gets sent messages count.
+ *
+ * @return Sent messages count.
+ */
+ public int sentMessagesCount() {
+ return sentMsgsCnt.intValue();
+ }
+
+ /**
+ * Gets sent bytes count.
+ *
+ * @return Sent bytes count.
+ */
+ public long sentBytesCount() {
+ return sentBytesCnt.longValue();
+ }
+
+ /**
+ * Gets received messages count.
+ *
+ * @return Received messages count.
+ */
+ public int receivedMessagesCount() {
+ return rcvdMsgsCnt.intValue();
+ }
+
+ /**
+ * Gets received bytes count.
+ *
+ * @return Received bytes count.
+ */
+ public long receivedBytesCount() {
+ return rcvdBytesCnt.longValue();
+ }
+
+ /**
+ * Converts statistics from internal representation to JMX-readable format.
+ *
+ * @param srcStat Internal statistics representation.
+ * @return Result map.
+ */
+ private Map<String, Long> convertStatistics(Map<String, LongAdder8> srcStat) {
+ Map<String, Long> destStat = U.newHashMap(srcStat.size());
+
+ for (Map.Entry<String, LongAdder8> entry : srcStat.entrySet()) {
+ destStat.put(entry.getKey(), entry.getValue().longValue());
+ }
+
+ return destStat;
+ }
+
+ /**
+ * Gets received messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ public Map<String, Long> receivedMessagesByType() {
+ return convertStatistics(rcvdMsgsCntByType);
+ }
+
+ /**
+ * Gets received messages counts (grouped by node).
+ *
+ * @return Map containing sender nodes and respective counts.
+ */
+ public Map<String, Long> receivedMessagesByNode() {
+ return convertStatistics(rcvdMsgsCntByNode);
+ }
+
+ /**
+ * Gets sent messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ public Map<String, Long> sentMessagesByType() {
+ return convertStatistics(sentMsgsCntByType);
+ }
+
+ /**
+ * Gets sent messages counts (grouped by node).
+ *
+ * @return Map containing receiver nodes and respective counts.
+ */
+ public Map<String, Long> sentMessagesByNode() {
+ return convertStatistics(sentMsgsCntByNode);
+ }
+
+ /**
+ * Resets metrics for this instance.
+ */
+ public void resetMetrics() {
+ // Can't use 'reset' method because it is not thread-safe
+ // according to javadoc.
+ sentMsgsCnt.add(-sentMsgsCnt.sum());
+ rcvdMsgsCnt.add(-rcvdMsgsCnt.sum());
+ sentBytesCnt.add(-sentBytesCnt.sum());
+ rcvdBytesCnt.add(-rcvdBytesCnt.sum());
+
+ sentMsgsCntByType.clear();
+ rcvdMsgsCntByType.clear();
+ sentMsgsCntByNode.clear();
+ rcvdMsgsCntByNode.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 49425ce..e68797e 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -83,7 +83,6 @@ import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
-import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
@@ -138,7 +137,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
-import org.jsr166.LongAdder8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -696,7 +694,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
else {
- rcvdMsgsCnt.increment();
+ metricsLsnr.onMessageReceived(msg, connKey.nodeId());
if (msg instanceof RecoveryLastReceivedMessage) {
GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
@@ -1111,34 +1109,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** Address resolver. */
private AddressResolver addrRslvr;
- /** Received messages count. */
- private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
-
- /** Sent messages count.*/
- private final LongAdder8 sentMsgsCnt = new LongAdder8();
-
- /** Received bytes count. */
- private final LongAdder8 rcvdBytesCnt = new LongAdder8();
-
- /** Sent bytes count.*/
- private final LongAdder8 sentBytesCnt = new LongAdder8();
-
/** Context initialization latch. */
private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
/** Stopping flag (set to {@code true} when SPI gets stopping signal). */
private volatile boolean stopping;
- /** metrics listener. */
- private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() {
- @Override public void onBytesSent(int bytesCnt) {
- sentBytesCnt.add(bytesCnt);
- }
-
- @Override public void onBytesReceived(int bytesCnt) {
- rcvdBytesCnt.add(bytesCnt);
- }
- };
+ /** Statistics. */
+ private final TcpCommunicationMetricsListener metricsLsnr = new TcpCommunicationMetricsListener();
/** Client connect futures. */
private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts =
@@ -1821,22 +1799,58 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** {@inheritDoc} */
@Override public int getSentMessagesCount() {
- return sentMsgsCnt.intValue();
+ return metricsLsnr.sentMessagesCount();
}
/** {@inheritDoc} */
@Override public long getSentBytesCount() {
- return sentBytesCnt.longValue();
+ return metricsLsnr.sentBytesCount();
}
/** {@inheritDoc} */
@Override public int getReceivedMessagesCount() {
- return rcvdMsgsCnt.intValue();
+ return metricsLsnr.receivedMessagesCount();
}
/** {@inheritDoc} */
@Override public long getReceivedBytesCount() {
- return rcvdBytesCnt.longValue();
+ return metricsLsnr.receivedBytesCount();
+ }
+
+ /**
+ * Gets received messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ public Map<String, Long> getReceivedMessagesByType() {
+ return metricsLsnr.receivedMessagesByType();
+ }
+
+ /**
+ * Gets received messages counts (grouped by node).
+ *
+ * @return Map containing sender nodes and respective counts.
+ */
+ public Map<String, Long> getReceivedMessagesByNode() {
+ return metricsLsnr.receivedMessagesByNode();
+ }
+
+ /**
+ * Gets sent messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ public Map<String, Long> getSentMessagesByType() {
+ return metricsLsnr.sentMessagesByType();
+ }
+
+ /**
+ * Gets sent messages counts (grouped by node).
+ *
+ * @return Map containing receiver nodes and respective counts.
+ */
+ public Map<String, Long> getSentMessagesByNode() {
+ return metricsLsnr.receivedMessagesByNode();
}
/** {@inheritDoc} */
@@ -1848,12 +1862,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** {@inheritDoc} */
@Override public void resetMetrics() {
- // Can't use 'reset' method because it is not thread-safe
- // according to javadoc.
- sentMsgsCnt.add(-sentMsgsCnt.sum());
- rcvdMsgsCnt.add(-rcvdMsgsCnt.sum());
- sentBytesCnt.add(-sentBytesCnt.sum());
- rcvdBytesCnt.add(-rcvdBytesCnt.sum());
+ metricsLsnr.resetMetrics();
}
/**
@@ -2607,7 +2616,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
client.release();
if (!retry)
- sentMsgsCnt.increment();
+ metricsLsnr.onMessageSent(msg, node.id());
else {
removeNodeClient(node.id(), client);
@@ -5146,6 +5155,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/** {@inheritDoc} */
+ @Override public Map<String, Long> getReceivedMessagesByType() {
+ return TcpCommunicationSpi.this.metricsLsnr.receivedMessagesByType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Long> getReceivedMessagesByNode() {
+ return TcpCommunicationSpi.this.metricsLsnr.receivedMessagesByNode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Long> getSentMessagesByType() {
+ return TcpCommunicationSpi.this.metricsLsnr.sentMessagesByType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Long> getSentMessagesByNode() {
+ return TcpCommunicationSpi.this.metricsLsnr.sentMessagesByNode();
+ }
+
+ /** {@inheritDoc} */
@Override public int getOutboundMessagesQueueSize() {
return TcpCommunicationSpi.this.getOutboundMessagesQueueSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 953245a..f4aba01 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.communication.tcp;
+import java.util.Map;
import org.apache.ignite.mxbean.MXBeanDescription;
import org.apache.ignite.spi.IgniteSpiManagementMBean;
@@ -147,6 +148,38 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
public long getReceivedBytesCount();
/**
+ * Gets received messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ @MXBeanDescription("Received messages count grouped by message type.")
+ public Map<String, Long> getReceivedMessagesByType();
+
+ /**
+ * Gets received messages counts (grouped by node).
+ *
+ * @return Map containing sender nodes and respective counts.
+ */
+ @MXBeanDescription("Received messages count grouped by sender node.")
+ public Map<String, Long> getReceivedMessagesByNode();
+
+ /**
+ * Gets sent messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ @MXBeanDescription("Sent messages count grouped by message type.")
+ public Map<String, Long> getSentMessagesByType();
+
+ /**
+ * Gets sent messages counts (grouped by node).
+ *
+ * @return Map containing receiver nodes and respective counts.
+ */
+ @MXBeanDescription("Sent messages count grouped by receiver node.")
+ public Map<String, Long> getSentMessagesByNode();
+
+ /**
* Gets outbound messages queue size.
*
* @return Outbound messages queue size.
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
new file mode 100644
index 0000000..f0a8d71
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.GridTestMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for TcpCommunicationSpi statistics.
+ */
+public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Mutex. */
+ final private Object mux = new Object();
+
+ /** */
+ final private CountDownLatch latch = new CountDownLatch(1);
+
+ static {
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
+ }
+
+ /**
+ * CommunicationSPI synchronized by {@code mux}.
+ */
+ private class SynchronizedCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ synchronized (mux) {
+ super.sendMessage(node, msg);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ synchronized (mux) {
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
+ super.notifyListener(sndId, msg, msgC);
+
+ if (msg instanceof GridTestMessage)
+ latch.countDown();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true));
+
+ TcpCommunicationSpi spi = new SynchronizedCommunicationSpi();
+
+ cfg.setCommunicationSpi(spi);
+
+ return cfg;
+ }
+
+ /**
+ * Gets TcpCommunicationSpiMBean for given node.
+ *
+ * @param nodeIdx Node index.
+ * @return MBean instance.
+ */
+ private TcpCommunicationSpiMBean mbean(int nodeIdx) throws MalformedObjectNameException {
+ ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "SPIs",
+ SynchronizedCommunicationSpi.class.getSimpleName());
+
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ if (mbeanServer.isRegistered(mbeanName))
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, mbeanName, TcpCommunicationSpiMBean.class,
+ true);
+ else
+ fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+ return null;
+ }
+
+ /**
+ * Compares two maps for equality.
+ */
+ private static <K, V> boolean mapsEquals(Map<K, V> map1, Map<K, V> map2) {
+ assert map1 != null;
+ assert map2 != null;
+
+ return map1.size() == map2.size() && map1.entrySet().containsAll(map2.entrySet());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStatistics() throws Exception {
+ startGrids(2);
+
+ try {
+ // Send custom message from node0 to node1.
+ grid(0).configuration().getCommunicationSpi().sendMessage(grid(1).cluster().localNode(),
+ new GridTestMessage());
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ ClusterGroup clusterGroupNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
+
+ // Send job from node0 to node1.
+ grid(0).compute(clusterGroupNode1).call(new IgniteCallable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ return Boolean.TRUE;
+ }
+ });
+
+ synchronized (mux) {
+ TcpCommunicationSpiMBean mbean0 = mbean(0);
+ TcpCommunicationSpiMBean mbean1 = mbean(1);
+
+ Map<String, Long> msgsSentByNode0 = mbean0.getSentMessagesByNode();
+ Map<String, Long> msgsSentByNode1 = mbean1.getSentMessagesByNode();
+ Map<String, Long> msgsReceivedByNode0 = mbean0.getReceivedMessagesByNode();
+ Map<String, Long> msgsReceivedByNode1 = mbean1.getReceivedMessagesByNode();
+
+ String nodeId0 = grid(0).localNode().id().toString();
+ String nodeId1 = grid(1).localNode().id().toString();
+
+ assertEquals(msgsReceivedByNode0.get(nodeId1).longValue(), mbean0.getReceivedMessagesCount());
+ assertEquals(msgsReceivedByNode1.get(nodeId0).longValue(), mbean1.getReceivedMessagesCount());
+ assertEquals(msgsSentByNode0.get(nodeId1).longValue(), mbean0.getSentMessagesCount());
+ assertEquals(msgsSentByNode1.get(nodeId0).longValue(), mbean1.getSentMessagesCount());
+
+ assertEquals(mbean0.getSentMessagesCount(), mbean1.getReceivedMessagesCount());
+ assertEquals(mbean1.getSentMessagesCount(), mbean0.getReceivedMessagesCount());
+
+ Map<String, Long> msgsSentByType0 = mbean0.getSentMessagesByType();
+ Map<String, Long> msgsSentByType1 = mbean1.getSentMessagesByType();
+ Map<String, Long> msgsReceivedByType0 = mbean0.getReceivedMessagesByType();
+ Map<String, Long> msgsReceivedByType1 = mbean1.getReceivedMessagesByType();
+
+ // Node0 sent exactly the same types and count of messages as node1 received.
+ assertTrue(mapsEquals(msgsSentByType0, msgsReceivedByType1));
+
+ // Node1 sent exactly the same types and count of messages as node0 received.
+ assertTrue(mapsEquals(msgsSentByType1, msgsReceivedByType0));
+
+ assertEquals(1, msgsSentByType0.get(GridTestMessage.class.getSimpleName()).longValue());
+ assertEquals(1, msgsReceivedByType1.get(GridTestMessage.class.getSimpleName()).longValue());
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 8e96a3f..7a4de1b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAck
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest;
/**
* Test suite for all communication SPIs.
@@ -81,6 +82,8 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class));
suite.addTest(new TestSuite(TcpCommunicationSpiHalfOpenedConnectionTest.class));
+ suite.addTest(new TestSuite(TcpCommunicationStatisticsTest.class));
+
return suite;
}
}