You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/17 10:32:47 UTC

[08/17] ignite git commit: Added discovery ring latency test + made it available from MBean

Added discovery ring latency test + made it available from MBean


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1fef59c7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1fef59c7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1fef59c7

Branch: refs/heads/ignite-5075
Commit: 1fef59c75e8b2e17a043d47abf24884a26a1644d
Parents: 256a4a3
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue May 16 18:16:42 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue May 16 18:16:42 2017 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 20 +++++
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 57 ++++++++++++++-
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  7 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  4 +
 .../spi/discovery/tcp/TcpDiscoverySpiMBean.java | 22 ++++++
 .../TcpDiscoveryRingLatencyCheckMessage.java    | 77 ++++++++++++++++++++
 6 files changed, 185 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 4c7199c..619b4cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -97,6 +97,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessag
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -831,6 +832,16 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void checkRingLatency(int maxHops) {
+        TcpDiscoveryRingLatencyCheckMessage msg = new TcpDiscoveryRingLatencyCheckMessage(getLocalNodeId(), maxHops);
+
+        if (log.isInfoEnabled())
+            log.info("Latency check initiated: " + msg.id());
+
+        sockWriter.sendMessage(msg);
+    }
+
+    /** {@inheritDoc} */
     @Override protected IgniteSpiThread workerThread() {
         return msgWorker;
     }
@@ -1221,6 +1232,12 @@ class ClientImpl extends TcpDiscoveryImpl {
                         msg,
                         sockTimeout);
 
+                    IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ?
+                        msg.id() : null;
+
+                    if (latencyCheckId != null && log.isInfoEnabled())
+                        log.info("Latency check message has been written to socket: " + latencyCheckId);
+
                     msg = null;
 
                     if (ack) {
@@ -1248,6 +1265,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                             throw new IOException("Failed to get acknowledge for message: " + unacked);
                         }
+
+                        if (latencyCheckId != null && log.isInfoEnabled())
+                            log.info("Latency check message has been acked: " + latencyCheckId);
                     }
                 }
                 catch (InterruptedException ignored) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5961b8d..fce6fe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -126,6 +126,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
@@ -1584,6 +1585,16 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void checkRingLatency(int maxHops) {
+        TcpDiscoveryRingLatencyCheckMessage msg = new TcpDiscoveryRingLatencyCheckMessage(getLocalNodeId(), maxHops);
+
+        if (log.isInfoEnabled())
+            log.info("Latency check initiated: " + msg.id());
+
+        msgWorker.addMessage(msg);
+    }
+
+    /** {@inheritDoc} */
     @Override void simulateNodeFailure() {
         U.warn(log, "Simulating node failure: " + getLocalNodeId());
 
@@ -2575,6 +2586,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             else if (msg instanceof TcpDiscoveryClientPingRequest)
                 processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
 
+            else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage)
+                processRingLatencyCheckMessage((TcpDiscoveryRingLatencyCheckMessage)msg);
+
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
@@ -2973,12 +2987,20 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     }
                                 }
 
+                                boolean latencyCheck = msg instanceof TcpDiscoveryRingLatencyCheckMessage;
+
+                                if (latencyCheck && log.isInfoEnabled())
+                                    log.info("Latency check message has been written to socket: " + msg.id());
+
                                 spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 long tstamp0 = U.currentTimeMillis();
 
                                 int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
+                                if (latencyCheck && log.isInfoEnabled())
+                                    log.info("Latency check message has been acked: " + msg.id());
+
                                 spi.stats.onMessageSent(msg, tstamp0 - tstamp);
 
                                 onMessageExchanged();
@@ -4375,6 +4397,33 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Processes latency check message.
+         *
+         * @param msg Latency check message.
+         */
+        private void processRingLatencyCheckMessage(TcpDiscoveryRingLatencyCheckMessage msg) {
+            assert msg != null;
+
+            if (msg.maxHopsReached()) {
+                if (log.isInfoEnabled())
+                    log.info("Latency check has been discarded (max hops reached) [id=" + msg.id() +
+                        ", maxHops=" + msg.maxHops() + ']');
+
+                return;
+            }
+
+            if (log.isInfoEnabled())
+                log.info("Latency check processing: " + msg.id());
+
+            if (sendMessageToRemotes(msg))
+                sendMessageAcrossRing(msg);
+            else {
+                if (log.isInfoEnabled())
+                    log.info("Latency check has been discarded (no remote nodes): " + msg.id());
+            }
+        }
+
+        /**
          * Processes node left message.
          *
          * @param msg Node left message.
@@ -5956,7 +6005,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             continue;
                         }
-                        if (msg instanceof TcpDiscoveryPingResponse) {
+                        else if (msg instanceof TcpDiscoveryPingResponse) {
                             assert msg.client() : msg;
 
                             ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
@@ -5966,6 +6015,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             continue;
                         }
+                        else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) {
+                            if (log.isInfoEnabled())
+                                log.info("Latency check message has been read: " + msg.id());
+
+                            ((TcpDiscoveryRingLatencyCheckMessage)msg).onRead();
+                        }
 
                         TcpDiscoveryClientMetricsUpdateMessage metricsUpdateMsg = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 66667aa..b31e2e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,6 +38,7 @@ import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -292,6 +292,11 @@ abstract class TcpDiscoveryImpl {
     public abstract void brakeConnection();
 
     /**
+     * @param maxHops Maximum hops for {@link TcpDiscoveryRingLatencyCheckMessage}.
+     */
+    public abstract void checkRingLatency(int maxHops);
+
+    /**
      * <strong>FOR TEST ONLY!!!</strong>
      *
      * @return Worker thread.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 46ede4d..cc581e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2344,5 +2344,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
         @Override public long getCoordinatorSinceTimestamp() {
             return TcpDiscoverySpi.this.getCoordinatorSinceTimestamp();
         }
+
+        @Override public void checkRingLatency(int maxHops) {
+            TcpDiscoverySpi.this.impl.checkRingLatency(maxHops);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
index a05ecde..fc78451 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -20,6 +20,8 @@ package org.apache.ignite.spi.discovery.tcp;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.mxbean.MXBeanDescription;
+import org.apache.ignite.mxbean.MXBeanParametersDescriptions;
+import org.apache.ignite.mxbean.MXBeanParametersNames;
 import org.apache.ignite.spi.IgniteSpiManagementMBean;
 import org.jetbrains.annotations.Nullable;
 
@@ -257,4 +259,24 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
      */
     @MXBeanDescription("Client mode.")
     public boolean isClientMode() throws IllegalStateException;
+
+    /**
+     * Diagnosis method for determining ring message latency.
+     * On this method call special message will be sent across the ring
+     * and stats about the message will appear in the logs of each node.
+     *
+     * @param maxHops Maximum hops for the message (3 * TOTAL_NODE_CNT is recommended).
+     */
+    @MXBeanDescription("Check ring latency.")
+    @MXBeanParametersNames(
+        {
+            "maxHops"
+        }
+    )
+    @MXBeanParametersDescriptions(
+        {
+            "Maximum hops for the message (3 * TOTAL_NODE_CNT is recommended)."
+        }
+    )
+    public void checkRingLatency(int maxHops);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fef59c7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryRingLatencyCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryRingLatencyCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryRingLatencyCheckMessage.java
new file mode 100644
index 0000000..d8c1145
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryRingLatencyCheckMessage.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class TcpDiscoveryRingLatencyCheckMessage extends TcpDiscoveryAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int maxHops;
+
+    /** */
+    private int curHop;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param maxHops Max hops for this message.
+     */
+    public TcpDiscoveryRingLatencyCheckMessage(
+        UUID creatorNodeId,
+        int maxHops
+    ) {
+        super(creatorNodeId);
+
+        assert maxHops > 0;
+
+        this.maxHops = maxHops;
+    }
+
+    /**
+     *
+     */
+    public void onRead() {
+        curHop++;
+    }
+
+    /**
+     * @return Max hops.
+     */
+    public int maxHops() {
+        return maxHops;
+    }
+
+    /**
+     * @return {@code True} if max hops has been reached.
+     */
+    public boolean maxHopsReached() {
+        return curHop == maxHops;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryRingLatencyCheckMessage.class, this, "super", super.toString());
+    }
+}