You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/17 10:32:47 UTC
[08/17] ignite git commit: Added discovery ring latency test + made
it available from MBean
Added discovery ring latency test + made it available from MBean
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1fef59c7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1fef59c7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1fef59c7
Branch: refs/heads/ignite-5075
Commit: 1fef59c75e8b2e17a043d47abf24884a26a1644d
Parents: 256a4a3
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue May 16 18:16:42 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue May 16 18:16:42 2017 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 20 +++++
.../ignite/spi/discovery/tcp/ServerImpl.java | 57 ++++++++++++++-
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 7 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 4 +
.../spi/discovery/tcp/TcpDiscoverySpiMBean.java | 22 ++++++
.../TcpDiscoveryRingLatencyCheckMessage.java | 77 ++++++++++++++++++++
6 files changed, 185 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 4c7199c..619b4cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -97,6 +97,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessag
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -831,6 +832,16 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
+ @Override public void checkRingLatency(int maxHops) {
+ TcpDiscoveryRingLatencyCheckMessage msg = new TcpDiscoveryRingLatencyCheckMessage(getLocalNodeId(), maxHops);
+
+ if (log.isInfoEnabled())
+ log.info("Latency check initiated: " + msg.id());
+
+ sockWriter.sendMessage(msg);
+ }
+
+ /** {@inheritDoc} */
@Override protected IgniteSpiThread workerThread() {
return msgWorker;
}
@@ -1221,6 +1232,12 @@ class ClientImpl extends TcpDiscoveryImpl {
msg,
sockTimeout);
+ IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ?
+ msg.id() : null;
+
+ if (latencyCheckId != null && log.isInfoEnabled())
+ log.info("Latency check message has been written to socket: " + latencyCheckId);
+
msg = null;
if (ack) {
@@ -1248,6 +1265,9 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new IOException("Failed to get acknowledge for message: " + unacked);
}
+
+ if (latencyCheckId != null && log.isInfoEnabled())
+ log.info("Latency check message has been acked: " + latencyCheckId);
}
}
catch (InterruptedException ignored) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5961b8d..fce6fe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -126,6 +126,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
@@ -1584,6 +1585,16 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
+ @Override public void checkRingLatency(int maxHops) {
+ TcpDiscoveryRingLatencyCheckMessage msg = new TcpDiscoveryRingLatencyCheckMessage(getLocalNodeId(), maxHops);
+
+ if (log.isInfoEnabled())
+ log.info("Latency check initiated: " + msg.id());
+
+ msgWorker.addMessage(msg);
+ }
+
+ /** {@inheritDoc} */
@Override void simulateNodeFailure() {
U.warn(log, "Simulating node failure: " + getLocalNodeId());
@@ -2575,6 +2586,9 @@ class ServerImpl extends TcpDiscoveryImpl {
else if (msg instanceof TcpDiscoveryClientPingRequest)
processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
+ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage)
+ processRingLatencyCheckMessage((TcpDiscoveryRingLatencyCheckMessage)msg);
+
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
@@ -2973,12 +2987,20 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ boolean latencyCheck = msg instanceof TcpDiscoveryRingLatencyCheckMessage;
+
+ if (latencyCheck && log.isInfoEnabled())
+ log.info("Latency check message has been written to socket: " + msg.id());
+
spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
long tstamp0 = U.currentTimeMillis();
int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
+ if (latencyCheck && log.isInfoEnabled())
+ log.info("Latency check message has been acked: " + msg.id());
+
spi.stats.onMessageSent(msg, tstamp0 - tstamp);
onMessageExchanged();
@@ -4375,6 +4397,33 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Processes latency check message.
+ *
+ * @param msg Latency check message.
+ */
+ private void processRingLatencyCheckMessage(TcpDiscoveryRingLatencyCheckMessage msg) {
+ assert msg != null;
+
+ if (msg.maxHopsReached()) {
+ if (log.isInfoEnabled())
+ log.info("Latency check has been discarded (max hops reached) [id=" + msg.id() +
+ ", maxHops=" + msg.maxHops() + ']');
+
+ return;
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Latency check processing: " + msg.id());
+
+ if (sendMessageToRemotes(msg))
+ sendMessageAcrossRing(msg);
+ else {
+ if (log.isInfoEnabled())
+ log.info("Latency check has been discarded (no remote nodes): " + msg.id());
+ }
+ }
+
+ /**
* Processes node left message.
*
* @param msg Node left message.
@@ -5956,7 +6005,7 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
- if (msg instanceof TcpDiscoveryPingResponse) {
+ else if (msg instanceof TcpDiscoveryPingResponse) {
assert msg.client() : msg;
ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
@@ -5966,6 +6015,12 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
+ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) {
+ if (log.isInfoEnabled())
+ log.info("Latency check message has been read: " + msg.id());
+
+ ((TcpDiscoveryRingLatencyCheckMessage)msg).onRead();
+ }
TcpDiscoveryClientMetricsUpdateMessage metricsUpdateMsg = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 66667aa..b31e2e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,6 +38,7 @@ import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;
/**
@@ -292,6 +292,11 @@ abstract class TcpDiscoveryImpl {
public abstract void brakeConnection();
/**
+ * @param maxHops Maximum hops for {@link TcpDiscoveryRingLatencyCheckMessage}.
+ */
+ public abstract void checkRingLatency(int maxHops);
+
+ /**
* <strong>FOR TEST ONLY!!!</strong>
*
* @return Worker thread.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 46ede4d..cc581e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2344,5 +2344,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
@Override public long getCoordinatorSinceTimestamp() {
return TcpDiscoverySpi.this.getCoordinatorSinceTimestamp();
}
+
+ @Override public void checkRingLatency(int maxHops) {
+ TcpDiscoverySpi.this.impl.checkRingLatency(maxHops);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
index a05ecde..fc78451 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -20,6 +20,8 @@ package org.apache.ignite.spi.discovery.tcp;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.mxbean.MXBeanDescription;
+import org.apache.ignite.mxbean.MXBeanParametersDescriptions;
+import org.apache.ignite.mxbean.MXBeanParametersNames;
import org.apache.ignite.spi.IgniteSpiManagementMBean;
import org.jetbrains.annotations.Nullable;
@@ -257,4 +259,24 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
*/
@MXBeanDescription("Client mode.")
public boolean isClientMode() throws IllegalStateException;
+
+ /**
+ * Diagnosis method for determining ring message latency.
+ * On this method call special message will be sent across the ring
+ * and stats about the message will appear in the logs of each node.
+ *
+ * @param maxHops Maximum hops for the message (3 * TOTAL_NODE_CNT is recommended).
+ */
+ @MXBeanDescription("Check ring latency.")
+ @MXBeanParametersNames(
+ {
+ "maxHops"
+ }
+ )
+ @MXBeanParametersDescriptions(
+ {
+ "Maximum hops for the message (3 * TOTAL_NODE_CNT is recommended)."
+ }
+ )
+ public void checkRingLatency(int maxHops);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryRingLatencyCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryRingLatencyCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryRingLatencyCheckMessage.java
new file mode 100644
index 0000000..d8c1145
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryRingLatencyCheckMessage.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class TcpDiscoveryRingLatencyCheckMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int maxHops;
+
+ /** */
+ private int curHop;
+
+ /**
+ * @param creatorNodeId Creator node ID.
+ * @param maxHops Max hops for this message.
+ */
+ public TcpDiscoveryRingLatencyCheckMessage(
+ UUID creatorNodeId,
+ int maxHops
+ ) {
+ super(creatorNodeId);
+
+ assert maxHops > 0;
+
+ this.maxHops = maxHops;
+ }
+
+ /**
+ *
+ */
+ public void onRead() {
+ curHop++;
+ }
+
+ /**
+ * @return Max hops.
+ */
+ public int maxHops() {
+ return maxHops;
+ }
+
+ /**
+ * @return {@code True} if max hops has been reached.
+ */
+ public boolean maxHopsReached() {
+ return curHop == maxHops;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryRingLatencyCheckMessage.class, this, "super", super.toString());
+ }
+}