You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/03 12:52:42 UTC
ignite git commit: debug info
Repository: ignite
Updated Branches:
refs/heads/ignite-gg-8.0.3.ea6-clients-test eaf62e080 -> 54668f1ae
debug info
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54668f1a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54668f1a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54668f1a
Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: 54668f1ae9c8c3e31cb4148c29ca397beccb9ded
Parents: eaf62e0
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 3 14:32:50 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 3 15:52:18 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/GridTopic.java | 5 +-
.../internal/IgniteDiagnosticMessage.java | 264 +++++++++++++++++++
.../apache/ignite/internal/IgniteKernal.java | 2 +
.../communication/GridIoMessageFactory.java | 5 +
.../processors/cluster/ClusterProcessor.java | 224 ++++++++++++++++
.../ignite/internal/util/nio/GridNioServer.java | 200 ++++++++++----
.../communication/tcp/TcpCommunicationSpi.java | 107 ++++++--
7 files changed, 722 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 86245a8..2fd7797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -102,7 +102,10 @@ public enum GridTopic {
TOPIC_IO_TEST,
/** */
- TOPIC_HADOOP_MSG;
+ TOPIC_HADOOP_MSG,
+
+ /** */
+ TOPIC_INTERNAL_DIAGNOSTIC;
/** Enum values. */
private static final GridTopic[] VALS = values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
new file mode 100644
index 0000000..7783b81
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class IgniteDiagnosticMessage implements Message {
+ /** */
+ private static final ThreadLocal<DateFormat> dateFormat = new ThreadLocal<DateFormat>() {
+ @Override protected DateFormat initialValue() {
+ return new SimpleDateFormat("HH:mm:ss.SSS");
+ }
+ };
+
+ /** */
+ private long futId;
+
+ /** */
+ private String msg;
+
+ /** */
+ private byte[] cBytes;
+
+ public IgniteDiagnosticMessage() {
+ // No-op.
+ }
+
+ public static IgniteDiagnosticMessage createRequest(GridKernalContext ctx, IgniteClosure<GridKernalContext, String> c, long futId) throws IgniteCheckedException {
+ byte[] cBytes = U.marshal(ctx.config().getMarshaller(), c);
+
+ IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
+
+ msg.futId = futId;
+ msg.cBytes = cBytes;
+
+ return msg;
+ }
+
+ public static IgniteDiagnosticMessage createResponse(String msg0, long futId) {
+ IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
+
+ msg.futId = futId;
+ msg.msg = msg0;
+
+ return msg;
+ }
+
+ public IgniteClosure<GridKernalContext, String> unmarshalClosure(GridKernalContext ctx) throws IgniteCheckedException {
+ assert cBytes != null;
+
+ return U.unmarshal(ctx, cBytes, null);
+ }
+
+ public long futureId() {
+ return futId;
+ }
+
+ public boolean request() {
+ return cBytes != null;
+ }
+
+ public String message() {
+ return msg;
+ }
+
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByteArray("cBytes", cBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeString("msg", msg))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cBytes = reader.readByteArray("cBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ msg = reader.readString("msg");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(IgniteDiagnosticMessage.class);
+ }
+
+ @Override public byte directType() {
+ return -46;
+ }
+
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ @Override public void onAckReceived() {
+
+ }
+
+ /**
+ *
+ */
+ public static class BaseClosure implements IgniteClosure<GridKernalContext, String> {
+ /** */
+ protected final UUID nodeId;
+
+ /**
+ * @param ctx Local node context.
+ */
+ public BaseClosure(GridKernalContext ctx) {
+ this.nodeId = ctx.localNodeId();
+ }
+
+ @Override public String apply(GridKernalContext ctx) {
+ try {
+ StringBuilder sb = new StringBuilder();
+
+ IgniteInternalFuture<String> commInfo = dumpCommunicationInfo(ctx, nodeId);
+
+ sb.append(dumpNodeBasicInfo(ctx));
+
+ sb.append(U.nl()).append(dumpExchangeInfo(ctx));
+
+ String moreInfo = dumpInfo(ctx);
+
+ sb.append(U.nl()).append(commInfo.get());
+
+ if (moreInfo != null)
+ sb.append(U.nl()).append(moreInfo);
+
+ return sb.toString();
+ }
+ catch (Exception e) {
+ ctx.cluster().diagnosticLog().error("Failed to execute diagnostic message closure: " + e, e);
+
+ return "Failed to execute diagnostic message closure: " + e;
+ }
+ }
+
+ protected String dumpInfo(GridKernalContext ctx) {
+ return null;
+ }
+ }
+
+ public static String dumpNodeBasicInfo(GridKernalContext ctx) {
+ StringBuilder sb = new StringBuilder("General node info [id=").append(ctx.localNodeId());
+
+ sb.append(", client=").append(ctx.clientNode());
+ sb.append(", discoTopVer=").append(ctx.discovery().topologyVersionEx());
+ sb.append(", time=").append(formatTime(U.currentTimeMillis()));
+
+ sb.append(']');
+
+ return sb.toString();
+ }
+
+ static String dumpExchangeInfo(GridKernalContext ctx) {
+ GridCachePartitionExchangeManager exchMgr = ctx.cache().context().exchange();
+
+ StringBuilder sb = new StringBuilder("Partitions exchange info [readyVer=").append(exchMgr.readyAffinityVersion());
+ sb.append("]");
+
+ GridDhtTopologyFuture fut = exchMgr.lastTopologyFuture();
+
+ sb.append(U.nl()).append("Last initialized exchange future: ").append(fut);
+
+ return sb.toString();
+ }
+
+ public static IgniteInternalFuture<String> dumpCommunicationInfo(GridKernalContext ctx, UUID nodeId) {
+ if (ctx.config().getCommunicationSpi() instanceof TcpCommunicationSpi)
+ return ((TcpCommunicationSpi) ctx.config().getCommunicationSpi()).dumpNodeStatistics(nodeId);
+ else
+ return new GridFinishedFuture<>("Unexpected communication SPI: " + ctx.config().getCommunicationSpi());
+ }
+ /**
+ * @param time Time.
+ * @return Time string.
+ */
+ private static String formatTime(long time) {
+ return dateFormat.get().format(new Date(time));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 391509e..0a95f91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -911,6 +911,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
provider.start(ctx.plugins().pluginContextForProvider(provider));
}
+ ctx.cluster().initListeners();
+
fillNodeAttributes(clusterProc.updateNotifierEnabled());
}
catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f73682..f443f31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridJobSiblingsRequest;
import org.apache.ignite.internal.GridJobSiblingsResponse;
import org.apache.ignite.internal.GridTaskCancelRequest;
import org.apache.ignite.internal.GridTaskSessionRequest;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest;
@@ -172,6 +173,10 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -46:
+ msg = new IgniteDiagnosticMessage();
+
+ break;
case -45:
msg = new GridChangeGlobalStateMessageResponse();
http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 124cb4b..0d0aead 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -23,22 +23,42 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteProperties;
import org.apache.ignite.internal.cluster.IgniteClusterImpl;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.GridTimerTask;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
/**
@@ -46,6 +66,9 @@ import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
*/
public class ClusterProcessor extends GridProcessorAdapter {
/** */
+ public static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic";
+
+ /** */
private static final String ATTR_UPDATE_NOTIFIER_STATUS = "UPDATE_NOTIFIER_STATUS";
/** Periodic version check delay. */
@@ -68,6 +91,15 @@ public class ClusterProcessor extends GridProcessorAdapter {
@GridToStringExclude
private GridUpdateNotifier verChecker;
+ /** */
+ private final IgniteLogger diagnosticLog;
+
+ /** */
+ private final AtomicReference<ConcurrentHashMap<Long, InternalDiagnosticFuture>> diagnosticFutMap = new AtomicReference<>();
+
+ /** */
+ private final AtomicLong diagFutId = new AtomicLong();
+
/**
* @param ctx Kernal context.
*/
@@ -78,6 +110,82 @@ public class ClusterProcessor extends GridProcessorAdapter {
Boolean.parseBoolean(IgniteProperties.get("ignite.update.notifier.enabled.by.default"))));
cluster = new IgniteClusterImpl(ctx);
+
+ diagnosticLog = ctx.log(DIAGNOSTIC_LOG_CATEGORY);
+ }
+
+ public void initListeners() throws IgniteCheckedException {
+ ctx.event().addLocalEventListener(new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt instanceof DiscoveryEvent;
+ assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID nodeId = discoEvt.eventNode().id();
+
+ ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get();
+
+ if (futs != null) {
+ for (InternalDiagnosticFuture fut : futs.values()) {
+ if (fut.nodeId.equals(nodeId))
+ fut.onDone("Target node failed: " + nodeId);
+ }
+ }
+ }
+ },
+ EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ ctx.io().addMessageListener(GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ return;
+
+ if (msg instanceof IgniteDiagnosticMessage) {
+ IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
+
+ if (msg0.request()) {
+ String resMsg;
+
+ IgniteClosure<GridKernalContext, String> c = null;
+
+ try {
+ c = msg0.unmarshalClosure(ctx);
+
+ resMsg = c.apply(ctx);
+ }
+ catch (Exception e) {
+ U.error(diagnosticLog, "Failed to run diagnostic closure: " + e, e);
+
+ resMsg = "Failed to run diagnostic closure: " + e;
+ }
+
+ IgniteDiagnosticMessage res = IgniteDiagnosticMessage.createResponse(resMsg, msg0.futureId());
+
+ try {
+ ctx.io().send(node, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(diagnosticLog, "Failed to send diagnostic response [msg=" + msg0 + "]", e);
+ }
+ }
+ else {
+ InternalDiagnosticFuture fut = diagnosticFuturesMap().get(msg0.futureId());
+
+ if (fut != null)
+ fut.onResponse(msg0);
+ else
+ U.warn(diagnosticLog, "Failed to find diagnostic message future [msg=" + msg0 + ']');
+ }
+ }
+ }
+ });
+ }
+
+ public IgniteLogger diagnosticLog() {
+ return diagnosticLog;
}
/**
@@ -177,6 +285,89 @@ public class ClusterProcessor extends GridProcessorAdapter {
return verChecker != null ? verChecker.latestVersion() : null;
}
+ public IgniteInternalFuture<String> diagnosticInfo(final UUID nodeId,
+ IgniteClosure<GridKernalContext, String> c,
+ final String msg) {
+ final GridFutureAdapter<String> infoFut = new GridFutureAdapter<>();
+
+ final IgniteInternalFuture<String> rmtFut = sendDiagnosticMessage(nodeId, c);
+
+ rmtFut.listen(new CI1<IgniteInternalFuture<String>>() {
+ @Override public void apply(IgniteInternalFuture<String> fut) {
+ String rmtMsg;
+
+ try {
+ rmtMsg = fut.get();
+ }
+ catch (Exception e) {
+ rmtMsg = "Diagnostic processing error: " + e;
+ }
+
+ final String rmtMsg0 = rmtMsg;
+
+ IgniteInternalFuture<String> locFut = IgniteDiagnosticMessage.dumpCommunicationInfo(ctx, nodeId);
+
+ locFut.listen(new CI1<IgniteInternalFuture<String>>() {
+ @Override public void apply(IgniteInternalFuture<String> locFut) {
+ String locMsg;
+
+ try {
+ locMsg = locFut.get();
+ }
+ catch (Exception e) {
+ locMsg = "Failed to get info for local node: " + e;
+ }
+
+ StringBuilder sb = new StringBuilder(msg);
+
+ sb.append(U.nl());
+ sb.append("Remote node information:").append(U.nl()).append(rmtMsg0);
+
+ sb.append(U.nl()).append("Local communication statistics:").append(U.nl());
+ sb.append(locMsg);
+
+ infoFut.onDone(sb.toString());
+ }
+ });
+ }
+ });
+
+ return infoFut;
+ }
+
+ private IgniteInternalFuture<String> sendDiagnosticMessage(UUID nodeId, IgniteClosure<GridKernalContext, String> c) {
+ try {
+ IgniteDiagnosticMessage msg = IgniteDiagnosticMessage.createRequest(ctx, c, diagFutId.getAndIncrement());
+
+ InternalDiagnosticFuture fut = new InternalDiagnosticFuture(nodeId, msg.futureId());
+
+ diagnosticFuturesMap().put(msg.futureId(), fut);
+
+ ctx.io().send(nodeId, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, msg, GridIoPolicy.SYSTEM_POOL);
+
+ return fut;
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to send diagnostic message: " + e);
+
+ return new GridFinishedFuture<>("Failed to send diagnostic message: " + e);
+ }
+ }
+
+ /**
+ * @return Diagnostic messages futures map.
+ */
+ private ConcurrentHashMap<Long, InternalDiagnosticFuture> diagnosticFuturesMap() {
+ ConcurrentHashMap<Long, InternalDiagnosticFuture> map = diagnosticFutMap.get();
+
+ if (map == null) {
+ if (!diagnosticFutMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
+ map = diagnosticFutMap.get();
+ }
+
+ return map;
+ }
+
/**
* Update notifier timer task.
*/
@@ -245,4 +436,37 @@ public class ClusterProcessor extends GridProcessorAdapter {
}
}
}
+
+ /**
+ *
+ */
+ class InternalDiagnosticFuture extends GridFutureAdapter<String> {
+ /** */
+ private final long id;
+
+ /** */
+ private final UUID nodeId;
+
+ /**
+ * @param id Future ID.
+ */
+ public InternalDiagnosticFuture(UUID nodeId, long id) {
+ this.nodeId = nodeId;
+ this.id = id;
+ }
+
+ public void onResponse(IgniteDiagnosticMessage msg) {
+ onDone(msg.message());
+ }
+
+ @Override public boolean onDone(@Nullable String res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ diagnosticFuturesMap().remove(id);
+
+ return true;
+ }
+
+ return false;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a59adba..2ea078f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -44,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
@@ -55,6 +56,7 @@ import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -709,6 +712,45 @@ public class GridNioServer<T> {
clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS));
}
+ public IgniteInternalFuture<String> dumpNodeStats(final String msg, IgnitePredicate<GridNioSession> p) {
+ GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new IgniteReducer<String, String>() {
+ private final StringBuilder sb = new StringBuilder(msg);
+
+ @Override public boolean collect(@Nullable String msg) {
+ if (!F.isEmpty(msg)) {
+ synchronized (sb) {
+ if (sb.length() > 0)
+ sb.append(U.nl());
+
+ sb.append(msg);
+ }
+ }
+
+ return true;
+ }
+
+ @Override public String reduce() {
+ synchronized (sb) {
+ return sb.toString();
+ }
+ }
+ });
+
+ for (int i = 0; i < clientWorkers.size(); i++) {
+ NioOperationFuture<String> opFut = new NioOperationFuture<>(null, NioOperation.DUMP_STATS);
+
+ opFut.msg = p;
+
+ clientWorkers.get(i).offer(opFut);
+
+ fut.add(opFut);
+ }
+
+ fut.markInitialized();
+
+ return fut;
+ }
+
/**
* Establishes a session.
*
@@ -1807,12 +1849,28 @@ public class GridNioServer<T> {
case DUMP_STATS: {
NioOperationFuture req = (NioOperationFuture)req0;
- try {
- dumpStats();
+ if (req.msg instanceof IgnitePredicate) {
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ dumpStats(sb, (IgnitePredicate<GridNioSession>)req.msg);
+ }
+ finally {
+ req.onDone(sb.toString());
+ }
}
- finally {
- // Complete the request just in case (none should wait on this future).
- req.onDone(true);
+ else {
+ try {
+ StringBuilder sb = new StringBuilder();
+
+ dumpStats(sb, null);
+
+ U.warn(log, sb.toString());
+ }
+ finally {
+ // Complete the request just in case (none should wait on this future).
+ req.onDone(true);
+ }
}
}
}
@@ -1919,81 +1977,111 @@ public class GridNioServer<T> {
}
}
- /**
- *
- */
- private void dumpStats() {
- StringBuilder sb = new StringBuilder();
-
- Set<SelectionKey> keys = selector.keys();
-
- sb.append(U.nl())
- .append(">> Selector info [idx=").append(idx)
+ private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> keys) {
+ sb.append(">> Selector info [idx=").append(idx)
.append(", keysCnt=").append(keys.size())
.append(", bytesRcvd=").append(bytesRcvd)
.append(", bytesRcvd0=").append(bytesRcvd0)
.append(", bytesSent=").append(bytesSent)
.append(", bytesSent0=").append(bytesSent0)
.append("]").append(U.nl());
+ }
+
+ /**
+ *
+ */
+ private void dumpStats(StringBuilder sb, @Nullable IgnitePredicate<GridNioSession> p) {
+ Set<SelectionKey> keys = selector.keys();
+
+ boolean selInfo = p == null;
+
+ if (selInfo)
+ dumpSelectorInfo(sb, keys);
for (SelectionKey key : keys) {
GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
- MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
- MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+ boolean sesInfo = p == null || p.apply(ses);
- sb.append(" Connection info [")
- .append("in=").append(ses.accepted())
- .append(", rmtAddr=").append(ses.remoteAddress())
- .append(", locAddr=").append(ses.localAddress());
+ if (sesInfo) {
+ if (!selInfo) {
+ dumpSelectorInfo(sb, keys);
- GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+ selInfo = true;
+ }
- if (outDesc != null) {
- sb.append(", msgsSent=").append(outDesc.sent())
- .append(", msgsAckedByRmt=").append(outDesc.acked())
- .append(", descIdHash=").append(System.identityHashCode(outDesc));
- }
- else
- sb.append(", outRecoveryDesc=null");
+ MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+ MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
- GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+ sb.append(" Connection info [")
+ .append("in=").append(ses.accepted())
+ .append(", rmtAddr=").append(ses.remoteAddress())
+ .append(", locAddr=").append(ses.localAddress());
- if (inDesc != null) {
- sb.append(", msgsRcvd=").append(inDesc.received())
- .append(", lastAcked=").append(inDesc.lastAcknowledged())
- .append(", descIdHash=").append(System.identityHashCode(inDesc));
- }
- else
- sb.append(", inRecoveryDesc=null");
+ GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+
+ if (outDesc != null) {
+ sb.append(", msgsSent=").append(outDesc.sent())
+ .append(", msgsAckedByRmt=").append(outDesc.acked())
+ .append(", descIdHash=").append(System.identityHashCode(outDesc));
+
+ if (!outDesc.messagesRequests().isEmpty()) {
+ int cnt = 0;
+
+ sb.append(", unackedMsgs=[");
- sb.append(", bytesRcvd=").append(ses.bytesReceived())
- .append(", bytesRcvd0=").append(ses.bytesReceived0())
- .append(", bytesSent=").append(ses.bytesSent())
- .append(", bytesSent0=").append(ses.bytesSent0())
- .append(", opQueueSize=").append(ses.writeQueueSize())
- .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
- .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+ for (SessionWriteRequest req : outDesc.messagesRequests()) {
+ if (cnt != 0)
+ sb.append(", ");
- int cnt = 0;
+ sb.append(req.message());
- for (SessionWriteRequest req : ses.writeQueue()) {
- if (cnt == 0)
- sb.append(",\n opQueue=[").append(req);
+ if (++cnt == 5)
+ break;
+ }
+
+ sb.append(']');
+ }
+ }
else
- sb.append(',').append(req);
+ sb.append(", outRecoveryDesc=null");
- if (++cnt == 5) {
- sb.append(']');
+ GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
- break;
+ if (inDesc != null) {
+ sb.append(", msgsRcvd=").append(inDesc.received())
+ .append(", lastAcked=").append(inDesc.lastAcknowledged())
+ .append(", descIdHash=").append(System.identityHashCode(inDesc));
}
- }
+ else
+ sb.append(", inRecoveryDesc=null");
- sb.append("]").append(U.nl());
- }
+ sb.append(", bytesRcvd=").append(ses.bytesReceived())
+ .append(", bytesRcvd0=").append(ses.bytesReceived0())
+ .append(", bytesSent=").append(ses.bytesSent())
+ .append(", bytesSent0=").append(ses.bytesSent0())
+ .append(", opQueueSize=").append(ses.writeQueueSize())
+ .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+ .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+
+ int cnt = 0;
- U.warn(log, sb.toString());
+ for (SessionWriteRequest req : ses.writeQueue()) {
+ if (cnt == 0)
+ sb.append(",\n opQueue=[").append(req);
+ else
+ sb.append(',').append(req);
+
+ if (++cnt == 5) {
+ sb.append(']');
+
+ break;
+ }
+ }
+
+ sb.append("]");
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/54668f1a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 47498dd..650d25b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
@@ -1650,17 +1651,65 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
rcvdBytesCnt.add(-rcvdBytesCnt.sum());
}
+ /**
+ * @param nodeId Target node ID.
+ * @return Future.
+ */
+ public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) {
+ StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl());
+
+ dumpInfo(sb, nodeId);
+
+ GridNioServer<Message> nioSrvr = this.nioSrvr;
+
+ if (nioSrvr != null) {
+ sb.append("NIO sessions statistics:");
+
+ IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>() {
+ @Override public boolean apply(GridNioSession ses) {
+ ConnectionKey connId = ses.meta(CONN_IDX_META);
+
+ return connId != null && nodeId.equals(connId.nodeId());
+ }
+ };
+
+ return nioSrvr.dumpNodeStats(sb.toString(), p);
+ }
+ else {
+ sb.append(U.nl()).append("GridNioServer is null.");
+
+ return new GridFinishedFuture<>(sb.toString());
+ }
+ }
+
/** {@inheritDoc} */
@Override public void dumpStats() {
IgniteLogger log = this.log;
if (log != null) {
- StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
+ StringBuilder sb = new StringBuilder();
+
+ dumpInfo(sb, null);
+
+ U.warn(log, sb.toString());
+ }
+
+ GridNioServer<Message> nioSrvr = this.nioSrvr;
+
+ if (nioSrvr != null)
+ nioSrvr.dumpStats();
+ }
+
+ private void dumpInfo(StringBuilder sb, UUID dstNodeId) {
+ sb.append("Communication SPI recovery descriptors: ").append(U.nl());
+
+ for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
+ GridNioRecoveryDescriptor desc = entry.getValue();
- for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
- GridNioRecoveryDescriptor desc = entry.getValue();
+ if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+ continue;
- sb.append(" [key=").append(entry.getKey())
+ sb.append(" [key=").append(entry.getKey())
.append(", msgsSent=").append(desc.sent())
.append(", msgsAckedByRmt=").append(desc.acked())
.append(", msgsRcvd=").append(desc.received())
@@ -1668,12 +1717,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.append(", reserveCnt=").append(desc.reserveCount())
.append(", descIdHash=").append(System.identityHashCode(desc))
.append(']').append(U.nl());
- }
+ }
+
+ for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
+ GridNioRecoveryDescriptor desc = entry.getValue();
- for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
- GridNioRecoveryDescriptor desc = entry.getValue();
+ if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+ continue;
- sb.append(" [key=").append(entry.getKey())
+ sb.append(" [key=").append(entry.getKey())
.append(", msgsSent=").append(desc.sent())
.append(", msgsAckedByRmt=").append(desc.acked())
.append(", reserveCnt=").append(desc.reserveCount())
@@ -1681,12 +1733,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.append(", reserved=").append(desc.reserved())
.append(", descIdHash=").append(System.identityHashCode(desc))
.append(']').append(U.nl());
- }
+ }
+
+ for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
+ GridNioRecoveryDescriptor desc = entry.getValue();
- for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
- GridNioRecoveryDescriptor desc = entry.getValue();
+ if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+ continue;
- sb.append(" [key=").append(entry.getKey())
+ sb.append(" [key=").append(entry.getKey())
.append(", msgsRcvd=").append(desc.received())
.append(", lastAcked=").append(desc.lastAcknowledged())
.append(", reserveCnt=").append(desc.reserveCount())
@@ -1695,30 +1750,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.append(", handshakeIdx=").append(desc.handshakeIndex())
.append(", descIdHash=").append(System.identityHashCode(desc))
.append(']').append(U.nl());
- }
+ }
- sb.append("Communication SPI clients: ").append(U.nl());
+ sb.append("Communication SPI clients: ").append(U.nl());
- for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
- UUID nodeId = entry.getKey();
- GridCommunicationClient[] clients0 = entry.getValue();
+ for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
+ UUID clientNodeId = entry.getKey();
- for (GridCommunicationClient client : clients0) {
- if (client != null) {
- sb.append(" [node=").append(nodeId)
+ if (dstNodeId != null && !dstNodeId.equals(clientNodeId))
+ continue;
+
+ GridCommunicationClient[] clients0 = entry.getValue();
+
+ for (GridCommunicationClient client : clients0) {
+ if (client != null) {
+ sb.append(" [node=").append(clientNodeId)
.append(", client=").append(client)
.append(']').append(U.nl());
- }
}
}
-
- U.warn(log, sb.toString());
}
-
- GridNioServer<Message> nioSrvr = this.nioSrvr;
-
- if (nioSrvr != null)
- nioSrvr.dumpStats();
}
/** */