You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/03 12:52:42 UTC

ignite git commit: debug info

Repository: ignite
Updated Branches:
  refs/heads/ignite-gg-8.0.3.ea6-clients-test eaf62e080 -> 54668f1ae


debug info


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54668f1a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54668f1a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54668f1a

Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: 54668f1ae9c8c3e31cb4148c29ca397beccb9ded
Parents: eaf62e0
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 3 14:32:50 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 3 15:52:18 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../internal/IgniteDiagnosticMessage.java       | 264 +++++++++++++++++++
 .../apache/ignite/internal/IgniteKernal.java    |   2 +
 .../communication/GridIoMessageFactory.java     |   5 +
 .../processors/cluster/ClusterProcessor.java    | 224 ++++++++++++++++
 .../ignite/internal/util/nio/GridNioServer.java | 200 ++++++++++----
 .../communication/tcp/TcpCommunicationSpi.java  | 107 ++++++--
 7 files changed, 722 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 86245a8..2fd7797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -102,7 +102,10 @@ public enum GridTopic {
     TOPIC_IO_TEST,
 
     /** */
-    TOPIC_HADOOP_MSG;
+    TOPIC_HADOOP_MSG,
+
+    /** */
+    TOPIC_INTERNAL_DIAGNOSTIC;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
new file mode 100644
index 0000000..7783b81
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class IgniteDiagnosticMessage implements Message {
+    /** */
+    private static final ThreadLocal<DateFormat> dateFormat = new ThreadLocal<DateFormat>() {
+        @Override protected DateFormat initialValue() {
+            return new SimpleDateFormat("HH:mm:ss.SSS");
+        }
+    };
+
+    /** */
+    private long futId;
+
+    /** */
+    private String msg;
+
+    /** */
+    private byte[] cBytes;
+
+    public IgniteDiagnosticMessage() {
+        // No-op.
+    }
+
+    public static IgniteDiagnosticMessage createRequest(GridKernalContext ctx, IgniteClosure<GridKernalContext, String> c, long futId) throws IgniteCheckedException {
+        byte[] cBytes = U.marshal(ctx.config().getMarshaller(), c);
+
+        IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
+
+        msg.futId = futId;
+        msg.cBytes = cBytes;
+
+        return msg;
+    }
+
+    public static IgniteDiagnosticMessage createResponse(String msg0, long futId) {
+        IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
+
+        msg.futId = futId;
+        msg.msg = msg0;
+
+        return msg;
+    }
+
+    public IgniteClosure<GridKernalContext, String> unmarshalClosure(GridKernalContext ctx) throws IgniteCheckedException {
+        assert cBytes != null;
+
+        return U.unmarshal(ctx, cBytes, null);
+    }
+
+    public long futureId() {
+        return futId;
+    }
+
+    public boolean request() {
+        return cBytes != null;
+    }
+
+    public String message() {
+        return msg;
+    }
+
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("cBytes", cBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeString("msg", msg))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cBytes = reader.readByteArray("cBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                msg = reader.readString("msg");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(IgniteDiagnosticMessage.class);
+    }
+
+    @Override public byte directType() {
+        return -46;
+    }
+
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    @Override public void onAckReceived() {
+
+    }
+
+    /**
+     *
+     */
+    public static class BaseClosure implements IgniteClosure<GridKernalContext, String> {
+        /** */
+        protected final UUID nodeId;
+
+        /**
+         * @param ctx Local node context.
+         */
+        public BaseClosure(GridKernalContext ctx) {
+            this.nodeId = ctx.localNodeId();
+        }
+
+        @Override public String apply(GridKernalContext ctx) {
+            try {
+                StringBuilder sb = new StringBuilder();
+
+                IgniteInternalFuture<String> commInfo = dumpCommunicationInfo(ctx, nodeId);
+
+                sb.append(dumpNodeBasicInfo(ctx));
+
+                sb.append(U.nl()).append(dumpExchangeInfo(ctx));
+
+                String moreInfo = dumpInfo(ctx);
+
+                sb.append(U.nl()).append(commInfo.get());
+
+                if (moreInfo != null)
+                    sb.append(U.nl()).append(moreInfo);
+
+                return sb.toString();
+            }
+            catch (Exception e) {
+                ctx.cluster().diagnosticLog().error("Failed to execute diagnostic message closure: " + e, e);
+
+                return "Failed to execute diagnostic message closure: " + e;
+            }
+        }
+
+        protected String dumpInfo(GridKernalContext ctx) {
+            return null;
+        }
+    }
+
+    public static String dumpNodeBasicInfo(GridKernalContext ctx) {
+        StringBuilder sb = new StringBuilder("General node info [id=").append(ctx.localNodeId());
+
+        sb.append(", client=").append(ctx.clientNode());
+        sb.append(", discoTopVer=").append(ctx.discovery().topologyVersionEx());
+        sb.append(", time=").append(formatTime(U.currentTimeMillis()));
+
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    static String dumpExchangeInfo(GridKernalContext ctx) {
+        GridCachePartitionExchangeManager exchMgr = ctx.cache().context().exchange();
+
+        StringBuilder sb = new StringBuilder("Partitions exchange info [readyVer=").append(exchMgr.readyAffinityVersion());
+        sb.append("]");
+
+        GridDhtTopologyFuture fut = exchMgr.lastTopologyFuture();
+
+        sb.append(U.nl()).append("Last initialized exchange future: ").append(fut);
+        
+        return sb.toString();
+    }
+
+    public static IgniteInternalFuture<String> dumpCommunicationInfo(GridKernalContext ctx, UUID nodeId) {
+        if (ctx.config().getCommunicationSpi() instanceof TcpCommunicationSpi)
+            return ((TcpCommunicationSpi) ctx.config().getCommunicationSpi()).dumpNodeStatistics(nodeId);
+        else
+            return new GridFinishedFuture<>("Unexpected communication SPI: " + ctx.config().getCommunicationSpi());
+    }
+    /**
+     * @param time Time.
+     * @return Time string.
+     */
+    private static String formatTime(long time) {
+        return dateFormat.get().format(new Date(time));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 391509e..0a95f91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -911,6 +911,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                     provider.start(ctx.plugins().pluginContextForProvider(provider));
                 }
 
+                ctx.cluster().initListeners();
+
                 fillNodeAttributes(clusterProc.updateNotifierEnabled());
             }
             catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f73682..f443f31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridJobSiblingsRequest;
 import org.apache.ignite.internal.GridJobSiblingsResponse;
 import org.apache.ignite.internal.GridTaskCancelRequest;
 import org.apache.ignite.internal.GridTaskSessionRequest;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
 import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest;
@@ -172,6 +173,10 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -46:
+                msg = new IgniteDiagnosticMessage();
+
+                break;
 
             case -45:
                 msg = new GridChangeGlobalStateMessageResponse();

http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 124cb4b..0d0aead 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -23,22 +23,42 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteProperties;
 import org.apache.ignite.internal.cluster.IgniteClusterImpl;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.util.GridTimerTask;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
 
 /**
@@ -46,6 +66,9 @@ import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
  */
 public class ClusterProcessor extends GridProcessorAdapter {
     /** */
+    public static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic";
+
+    /** */
     private static final String ATTR_UPDATE_NOTIFIER_STATUS = "UPDATE_NOTIFIER_STATUS";
 
     /** Periodic version check delay. */
@@ -68,6 +91,15 @@ public class ClusterProcessor extends GridProcessorAdapter {
     @GridToStringExclude
     private GridUpdateNotifier verChecker;
 
+    /** */
+    private final IgniteLogger diagnosticLog;
+
+    /** */
+    private final AtomicReference<ConcurrentHashMap<Long, InternalDiagnosticFuture>> diagnosticFutMap = new AtomicReference<>();
+
+    /** */
+    private final AtomicLong diagFutId = new AtomicLong();
+
     /**
      * @param ctx Kernal context.
      */
@@ -78,6 +110,82 @@ public class ClusterProcessor extends GridProcessorAdapter {
             Boolean.parseBoolean(IgniteProperties.get("ignite.update.notifier.enabled.by.default"))));
 
         cluster = new IgniteClusterImpl(ctx);
+
+        diagnosticLog = ctx.log(DIAGNOSTIC_LOG_CATEGORY);
+    }
+
+    public void initListeners() throws IgniteCheckedException {
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    assert evt instanceof DiscoveryEvent;
+                    assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+                    DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                    UUID nodeId = discoEvt.eventNode().id();
+
+                    ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get();
+
+                    if (futs != null) {
+                        for (InternalDiagnosticFuture fut : futs.values()) {
+                            if (fut.nodeId.equals(nodeId))
+                                fut.onDone("Target node failed: " + nodeId);
+                        }
+                    }
+                }
+            },
+            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        ctx.io().addMessageListener(GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (node == null)
+                    return;
+
+                if (msg instanceof IgniteDiagnosticMessage) {
+                    IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
+
+                    if (msg0.request()) {
+                        String resMsg;
+
+                        IgniteClosure<GridKernalContext, String> c = null;
+
+                        try {
+                            c = msg0.unmarshalClosure(ctx);
+
+                            resMsg = c.apply(ctx);
+                        }
+                        catch (Exception e) {
+                            U.error(diagnosticLog, "Failed to run diagnostic closure: " + e, e);
+
+                            resMsg = "Failed to run diagnostic closure: " + e;
+                        }
+
+                        IgniteDiagnosticMessage res = IgniteDiagnosticMessage.createResponse(resMsg, msg0.futureId());
+
+                        try {
+                            ctx.io().send(node, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(diagnosticLog, "Failed to send diagnostic response [msg=" + msg0 + "]", e);
+                        }
+                    }
+                    else {
+                        InternalDiagnosticFuture fut = diagnosticFuturesMap().get(msg0.futureId());
+
+                        if (fut != null)
+                            fut.onResponse(msg0);
+                        else
+                            U.warn(diagnosticLog, "Failed to find diagnostic message future [msg=" + msg0 + ']');
+                    }
+                }
+            }
+        });
+    }
+
+    public IgniteLogger diagnosticLog() {
+        return diagnosticLog;
     }
 
     /**
@@ -177,6 +285,89 @@ public class ClusterProcessor extends GridProcessorAdapter {
         return verChecker != null ? verChecker.latestVersion() : null;
     }
 
+    public IgniteInternalFuture<String> diagnosticInfo(final UUID nodeId,
+        IgniteClosure<GridKernalContext, String> c,
+        final String msg) {
+        final GridFutureAdapter<String> infoFut = new GridFutureAdapter<>();
+
+        final IgniteInternalFuture<String> rmtFut = sendDiagnosticMessage(nodeId, c);
+
+        rmtFut.listen(new CI1<IgniteInternalFuture<String>>() {
+            @Override public void apply(IgniteInternalFuture<String> fut) {
+                String rmtMsg;
+
+                try {
+                    rmtMsg = fut.get();
+                }
+                catch (Exception e) {
+                    rmtMsg = "Diagnostic processing error: " + e;
+                }
+
+                final String rmtMsg0 = rmtMsg;
+
+                IgniteInternalFuture<String> locFut = IgniteDiagnosticMessage.dumpCommunicationInfo(ctx, nodeId);
+
+                locFut.listen(new CI1<IgniteInternalFuture<String>>() {
+                    @Override public void apply(IgniteInternalFuture<String> locFut) {
+                        String locMsg;
+
+                        try {
+                            locMsg = locFut.get();
+                        }
+                        catch (Exception e) {
+                            locMsg = "Failed to get info for local node: " + e;
+                        }
+
+                        StringBuilder sb = new StringBuilder(msg);
+
+                        sb.append(U.nl());
+                        sb.append("Remote node information:").append(U.nl()).append(rmtMsg0);
+
+                        sb.append(U.nl()).append("Local communication statistics:").append(U.nl());
+                        sb.append(locMsg);
+
+                        infoFut.onDone(sb.toString());
+                    }
+                });
+            }
+        });
+
+        return infoFut;
+    }
+
+    private IgniteInternalFuture<String> sendDiagnosticMessage(UUID nodeId, IgniteClosure<GridKernalContext, String> c) {
+        try {
+            IgniteDiagnosticMessage msg = IgniteDiagnosticMessage.createRequest(ctx, c, diagFutId.getAndIncrement());
+
+            InternalDiagnosticFuture fut = new InternalDiagnosticFuture(nodeId, msg.futureId());
+
+            diagnosticFuturesMap().put(msg.futureId(), fut);
+
+            ctx.io().send(nodeId, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, msg, GridIoPolicy.SYSTEM_POOL);
+
+            return fut;
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to send diagnostic message: " + e);
+
+            return new GridFinishedFuture<>("Failed to send diagnostic message: " + e);
+        }
+    }
+
+    /**
+     * @return Diagnostic messages futures map.
+     */
+    private ConcurrentHashMap<Long, InternalDiagnosticFuture> diagnosticFuturesMap() {
+        ConcurrentHashMap<Long, InternalDiagnosticFuture> map = diagnosticFutMap.get();
+
+        if (map == null) {
+            if (!diagnosticFutMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
+                map = diagnosticFutMap.get();
+        }
+
+        return map;
+    }
+
     /**
      * Update notifier timer task.
      */
@@ -245,4 +436,37 @@ public class ClusterProcessor extends GridProcessorAdapter {
             }
         }
     }
+
+    /**
+     *
+     */
+    class InternalDiagnosticFuture extends GridFutureAdapter<String> {
+        /** */
+        private final long id;
+
+        /** */
+        private final UUID nodeId;
+
+        /**
+         * @param id Future ID.
+         */
+        public InternalDiagnosticFuture(UUID nodeId, long id) {
+            this.nodeId = nodeId;
+            this.id = id;
+        }
+
+        public void onResponse(IgniteDiagnosticMessage msg) {
+            onDone(msg.message());
+        }
+
+        @Override public boolean onDone(@Nullable String res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                diagnosticFuturesMap().remove(id);
+
+                return true;
+            }
+
+            return false;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a59adba..2ea078f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
@@ -55,6 +56,7 @@ import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -709,6 +712,45 @@ public class GridNioServer<T> {
             clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS));
     }
 
+    public IgniteInternalFuture<String> dumpNodeStats(final String msg, IgnitePredicate<GridNioSession> p) {
+        GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new IgniteReducer<String, String>() {
+            private final StringBuilder sb = new StringBuilder(msg);
+
+            @Override public boolean collect(@Nullable String msg) {
+                if (!F.isEmpty(msg)) {
+                    synchronized (sb) {
+                        if (sb.length() > 0)
+                            sb.append(U.nl());
+
+                        sb.append(msg);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public String reduce() {
+                synchronized (sb) {
+                    return sb.toString();
+                }
+            }
+        });
+
+        for (int i = 0; i < clientWorkers.size(); i++) {
+            NioOperationFuture<String> opFut = new NioOperationFuture<>(null, NioOperation.DUMP_STATS);
+
+            opFut.msg = p;
+
+            clientWorkers.get(i).offer(opFut);
+
+            fut.add(opFut);
+        }
+
+        fut.markInitialized();
+
+        return fut;
+    }
+
     /**
      * Establishes a session.
      *
@@ -1807,12 +1849,28 @@ public class GridNioServer<T> {
                             case DUMP_STATS: {
                                 NioOperationFuture req = (NioOperationFuture)req0;
 
-                                try {
-                                    dumpStats();
+                                if (req.msg instanceof IgnitePredicate) {
+                                    StringBuilder sb = new StringBuilder();
+
+                                    try {
+                                        dumpStats(sb, (IgnitePredicate<GridNioSession>)req.msg);
+                                    }
+                                    finally {
+                                        req.onDone(sb.toString());
+                                    }
                                 }
-                                finally {
-                                    // Complete the request just in case (none should wait on this future).
-                                    req.onDone(true);
+                                else {
+                                    try {
+                                        StringBuilder sb = new StringBuilder();
+
+                                        dumpStats(sb, null);
+
+                                        U.warn(log, sb.toString());
+                                    }
+                                    finally {
+                                        // Complete the request just in case (none should wait on this future).
+                                        req.onDone(true);
+                                    }
                                 }
                             }
                         }
@@ -1919,81 +1977,111 @@ public class GridNioServer<T> {
             }
         }
 
-        /**
-         *
-         */
-        private void dumpStats() {
-            StringBuilder sb = new StringBuilder();
-
-            Set<SelectionKey> keys = selector.keys();
-
-            sb.append(U.nl())
-                .append(">> Selector info [idx=").append(idx)
+        private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> keys) {
+            sb.append(">> Selector info [idx=").append(idx)
                 .append(", keysCnt=").append(keys.size())
                 .append(", bytesRcvd=").append(bytesRcvd)
                 .append(", bytesRcvd0=").append(bytesRcvd0)
                 .append(", bytesSent=").append(bytesSent)
                 .append(", bytesSent0=").append(bytesSent0)
                 .append("]").append(U.nl());
+        }
+
+        /**
+         *
+         */
+        private void dumpStats(StringBuilder sb, @Nullable IgnitePredicate<GridNioSession> p) {
+            Set<SelectionKey> keys = selector.keys();
+
+            boolean selInfo = p == null;
+
+            if (selInfo)
+                dumpSelectorInfo(sb, keys);
 
             for (SelectionKey key : keys) {
                 GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
 
-                MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-                MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+                boolean sesInfo = p == null || p.apply(ses);
 
-                sb.append("    Connection info [")
-                    .append("in=").append(ses.accepted())
-                    .append(", rmtAddr=").append(ses.remoteAddress())
-                    .append(", locAddr=").append(ses.localAddress());
+                if (sesInfo) {
+                    if (!selInfo) {
+                        dumpSelectorInfo(sb, keys);
 
-                GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+                        selInfo = true;
+                    }
 
-                if (outDesc != null) {
-                    sb.append(", msgsSent=").append(outDesc.sent())
-                        .append(", msgsAckedByRmt=").append(outDesc.acked())
-                        .append(", descIdHash=").append(System.identityHashCode(outDesc));
-                }
-                else
-                    sb.append(", outRecoveryDesc=null");
+                    MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                    MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
 
-                GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+                    sb.append("    Connection info [")
+                        .append("in=").append(ses.accepted())
+                        .append(", rmtAddr=").append(ses.remoteAddress())
+                        .append(", locAddr=").append(ses.localAddress());
 
-                if (inDesc != null) {
-                    sb.append(", msgsRcvd=").append(inDesc.received())
-                        .append(", lastAcked=").append(inDesc.lastAcknowledged())
-                        .append(", descIdHash=").append(System.identityHashCode(inDesc));
-                }
-                else
-                    sb.append(", inRecoveryDesc=null");
+                    GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+
+                    if (outDesc != null) {
+                        sb.append(", msgsSent=").append(outDesc.sent())
+                            .append(", msgsAckedByRmt=").append(outDesc.acked())
+                            .append(", descIdHash=").append(System.identityHashCode(outDesc));
+
+                        if (!outDesc.messagesRequests().isEmpty()) {
+                            int cnt = 0;
+
+                            sb.append(", unackedMsgs=[");
 
-                sb.append(", bytesRcvd=").append(ses.bytesReceived())
-                    .append(", bytesRcvd0=").append(ses.bytesReceived0())
-                    .append(", bytesSent=").append(ses.bytesSent())
-                    .append(", bytesSent0=").append(ses.bytesSent0())
-                    .append(", opQueueSize=").append(ses.writeQueueSize())
-                    .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
-                    .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+                            for (SessionWriteRequest req : outDesc.messagesRequests()) {
+                                if (cnt != 0)
+                                    sb.append(", ");
 
-                int cnt = 0;
+                                sb.append(req.message());
 
-                for (SessionWriteRequest req : ses.writeQueue()) {
-                    if (cnt == 0)
-                        sb.append(",\n opQueue=[").append(req);
+                                if (++cnt == 5)
+                                    break;
+                            }
+
+                            sb.append(']');
+                        }
+                    }
                     else
-                        sb.append(',').append(req);
+                        sb.append(", outRecoveryDesc=null");
 
-                    if (++cnt == 5) {
-                        sb.append(']');
+                    GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
 
-                        break;
+                    if (inDesc != null) {
+                        sb.append(", msgsRcvd=").append(inDesc.received())
+                            .append(", lastAcked=").append(inDesc.lastAcknowledged())
+                            .append(", descIdHash=").append(System.identityHashCode(inDesc));
                     }
-                }
+                    else
+                        sb.append(", inRecoveryDesc=null");
 
-                sb.append("]").append(U.nl());
-            }
+                    sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                        .append(", bytesRcvd0=").append(ses.bytesReceived0())
+                        .append(", bytesSent=").append(ses.bytesSent())
+                        .append(", bytesSent0=").append(ses.bytesSent0())
+                        .append(", opQueueSize=").append(ses.writeQueueSize())
+                        .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+                        .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+
+                    int cnt = 0;
 
-            U.warn(log, sb.toString());
+                    for (SessionWriteRequest req : ses.writeQueue()) {
+                        if (cnt == 0)
+                            sb.append(",\n opQueue=[").append(req);
+                        else
+                            sb.append(',').append(req);
+
+                        if (++cnt == 5) {
+                            sb.append(']');
+
+                            break;
+                        }
+                    }
+
+                    sb.append("]");
+                }
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 47498dd..650d25b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.ipc.IpcEndpoint;
 import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
@@ -1650,17 +1651,65 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         rcvdBytesCnt.add(-rcvdBytesCnt.sum());
     }
 
+    /**
+     * @param nodeId Target node ID.
+     * @return Future.
+     */
+    public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) {
+        StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl());
+
+        dumpInfo(sb, nodeId);
+
+        GridNioServer<Message> nioSrvr = this.nioSrvr;
+
+        if (nioSrvr != null) {
+            sb.append("NIO sessions statistics:");
+
+            IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>() {
+                @Override public boolean apply(GridNioSession ses) {
+                    ConnectionKey connId = ses.meta(CONN_IDX_META);
+
+                    return connId != null && nodeId.equals(connId.nodeId());
+                }
+            };
+
+            return nioSrvr.dumpNodeStats(sb.toString(), p);
+        }
+        else {
+            sb.append(U.nl()).append("GridNioServer is null.");
+
+            return new GridFinishedFuture<>(sb.toString());
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void dumpStats() {
         IgniteLogger log = this.log;
 
         if (log != null) {
-            StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
+            StringBuilder sb = new StringBuilder();
+
+            dumpInfo(sb, null);
+
+            U.warn(log, sb.toString());
+        }
+
+        GridNioServer<Message> nioSrvr = this.nioSrvr;
+
+        if (nioSrvr != null)
+            nioSrvr.dumpStats();
+    }
+
+    private void dumpInfo(StringBuilder sb, UUID dstNodeId) {
+        sb.append("Communication SPI recovery descriptors: ").append(U.nl());
+
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
+            GridNioRecoveryDescriptor desc = entry.getValue();
 
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
-                GridNioRecoveryDescriptor desc = entry.getValue();
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
 
-                sb.append("    [key=").append(entry.getKey())
+            sb.append("    [key=").append(entry.getKey())
                     .append(", msgsSent=").append(desc.sent())
                     .append(", msgsAckedByRmt=").append(desc.acked())
                     .append(", msgsRcvd=").append(desc.received())
@@ -1668,12 +1717,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(", reserveCnt=").append(desc.reserveCount())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
-            }
+        }
+
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
+            GridNioRecoveryDescriptor desc = entry.getValue();
 
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
-                GridNioRecoveryDescriptor desc = entry.getValue();
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
 
-                sb.append("    [key=").append(entry.getKey())
+            sb.append("    [key=").append(entry.getKey())
                     .append(", msgsSent=").append(desc.sent())
                     .append(", msgsAckedByRmt=").append(desc.acked())
                     .append(", reserveCnt=").append(desc.reserveCount())
@@ -1681,12 +1733,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(", reserved=").append(desc.reserved())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
-            }
+        }
+
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
+            GridNioRecoveryDescriptor desc = entry.getValue();
 
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
-                GridNioRecoveryDescriptor desc = entry.getValue();
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
 
-                sb.append("    [key=").append(entry.getKey())
+            sb.append("    [key=").append(entry.getKey())
                     .append(", msgsRcvd=").append(desc.received())
                     .append(", lastAcked=").append(desc.lastAcknowledged())
                     .append(", reserveCnt=").append(desc.reserveCount())
@@ -1695,30 +1750,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(", handshakeIdx=").append(desc.handshakeIndex())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
-            }
+        }
 
-            sb.append("Communication SPI clients: ").append(U.nl());
+        sb.append("Communication SPI clients: ").append(U.nl());
 
-            for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
-                UUID nodeId = entry.getKey();
-                GridCommunicationClient[] clients0 = entry.getValue();
+        for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
+            UUID clientNodeId = entry.getKey();
 
-                for (GridCommunicationClient client : clients0) {
-                    if (client != null) {
-                        sb.append("    [node=").append(nodeId)
+            if (dstNodeId != null && !dstNodeId.equals(clientNodeId))
+                continue;
+
+            GridCommunicationClient[] clients0 = entry.getValue();
+
+            for (GridCommunicationClient client : clients0) {
+                if (client != null) {
+                    sb.append("    [node=").append(clientNodeId)
                             .append(", client=").append(client)
                             .append(']').append(U.nl());
-                    }
                 }
             }
-
-            U.warn(log, sb.toString());
         }
-
-        GridNioServer<Message> nioSrvr = this.nioSrvr;
-
-        if (nioSrvr != null)
-            nioSrvr.dumpStats();
     }
 
     /** */