You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/01 13:44:57 UTC
[05/53] [abbrv] incubator-ignite git commit: # IGNITE-943 Merge
TcpDiscoverySpi and TcpClientDiscoverySpi
# IGNITE-943 Merge TcpDiscoverySpi and TcpClientDiscoverySpi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/838c0fd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/838c0fd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/838c0fd8
Branch: refs/heads/ignite-sprint-5
Commit: 838c0fd83b67b5e906144777d7340d5e61bdfa8a
Parents: 2f169f5
Author: sevdokimov <se...@gridgain.com>
Authored: Wed May 27 16:18:37 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Wed May 27 16:18:37 2015 +0300
----------------------------------------------------------------------
.../main/java/org/apache/ignite/Ignition.java | 9 +-
.../org/apache/ignite/cluster/ClusterNode.java | 6 +-
.../configuration/IgniteConfiguration.java | 4 +-
.../apache/ignite/internal/IgniteKernal.java | 5 +-
.../org/apache/ignite/internal/IgnitionEx.java | 14 +-
.../ignite/spi/discovery/DiscoverySpi.java | 17 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 1481 +++++
.../ignite/spi/discovery/tcp/ServerImpl.java | 4792 +++++++++++++++
.../spi/discovery/tcp/SocketMultiConnector.java | 2 +-
.../discovery/tcp/TcpClientDiscoverySpi.java | 1573 -----
.../tcp/TcpClientDiscoverySpiMBean.java | 156 -
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 175 +
.../spi/discovery/tcp/TcpDiscoverySpi.java | 5777 ++++--------------
.../discovery/tcp/TcpDiscoverySpiAdapter.java | 1185 ----
.../spi/discovery/tcp/TcpDiscoverySpiMBean.java | 8 +
.../internal/GridReleaseTypeSelfTest.java | 16 +-
.../GridDiscoveryManagerAliveCacheSelfTest.java | 17 +-
.../GridDiscoveryManagerAttributesSelfTest.java | 21 +-
.../discovery/GridDiscoveryManagerSelfTest.java | 21 +-
...acheTcpClientDiscoveryMultiThreadedTest.java | 8 +-
.../IgniteClientDataStructuresAbstractTest.java | 9 +-
...ientModesTcpClientDiscoveryAbstractTest.java | 10 +-
...unctionExcludeNeighborsAbstractSelfTest.java | 5 +-
.../GridCacheSyncReplicatedPreloadSelfTest.java | 1 -
...pClientDiscoveryMarshallerCheckSelfTest.java | 9 +-
.../TcpClientDiscoverySpiConfigSelfTest.java | 39 -
.../tcp/TcpClientDiscoverySpiSelfTest.java | 44 +-
.../tcp/TcpDiscoveryConcurrentStartTest.java | 4 +-
.../tcp/TcpDiscoveryMultiThreadedTest.java | 8 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +-
.../tcp/TcpDiscoverySpiConfigSelfTest.java | 8 +
.../testframework/junits/GridAbstractTest.java | 8 +-
.../IgniteSpiDiscoverySelfTestSuite.java | 1 -
33 files changed, 7693 insertions(+), 7742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/Ignition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignition.java b/modules/core/src/main/java/org/apache/ignite/Ignition.java
index 3270f5c..35e0b51 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignition.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignition.java
@@ -20,6 +20,7 @@ package org.apache.ignite;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.jetbrains.annotations.*;
@@ -138,11 +139,11 @@ public class Ignition {
* <p>
* This flag used when node is started if {@link IgniteConfiguration#isClientMode()}
* is {@code null}. When {@link IgniteConfiguration#isClientMode()} is set this flag is ignored.
- * It is recommended to use {@link TcpClientDiscoverySpi} on client nodes.
+ * It is recommended to use {@link DiscoverySpi} in client mode too.
*
* @param clientMode Client mode flag.
* @see IgniteConfiguration#isClientMode()
- * @see TcpClientDiscoverySpi
+ * @see TcpDiscoverySpi#setClientMode(boolean)
*/
public static void setClientMode(boolean clientMode) {
IgnitionEx.setClientMode(clientMode);
@@ -153,11 +154,11 @@ public class Ignition {
* <p>
* This flag used when node is started if {@link IgniteConfiguration#isClientMode()}
* is {@code null}. When {@link IgniteConfiguration#isClientMode()} is set this flag is ignored.
- * It is recommended to use {@link TcpClientDiscoverySpi} on client nodes.
+ * It is recommended to use {@link DiscoverySpi} in client mode too.
*
* @return Client mode flag.
* @see IgniteConfiguration#isClientMode()
- * @see TcpClientDiscoverySpi
+ * @see TcpDiscoverySpi#setClientMode(boolean)
*/
public static boolean isClientMode() {
return IgnitionEx.isClientMode();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index 13dc30a..8f56372 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -20,7 +20,7 @@ package org.apache.ignite.cluster;
import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -243,12 +243,12 @@ public interface ClusterNode {
* Tests whether or not this node is connected to cluster as a client.
* <p>
* Do not confuse client in terms of
- * discovery {@link TcpClientDiscoverySpi} and client in terms of cache
+ * discovery {@link DiscoverySpi#isClientMode()} and client in terms of cache
* {@link IgniteConfiguration#isClientMode()}. Cache clients cannot carry data,
* while topology clients connect to topology in a different way.
*
* @return {@code True} if this node is a client node, {@code false} otherwise.
- * @see TcpClientDiscoverySpi
+ * @see DiscoverySpi#isClientMode()
* @see IgniteConfiguration#isClientMode()
* @see Ignition#isClientMode()
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index e47d4b1..7ddfd71 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -1824,10 +1824,10 @@ public class IgniteConfiguration {
/**
* Gets client mode flag. Client node cannot hold data in the caches. It's recommended to use
- * {@link TcpClientDiscoverySpi} on client nodes.
+ * {@link DiscoverySpi} in client mode if this property is {@code true}.
*
* @return Client mode flag.
- * @see TcpClientDiscoverySpi
+ * @see TcpDiscoverySpi#setClientMode(boolean)
*/
public Boolean isClientMode() {
return clientMode;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 84d81d7..d6e3ca4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -71,7 +71,6 @@ import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.mxbean.*;
import org.apache.ignite.plugin.*;
import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.discovery.tcp.*;
import org.jetbrains.annotations.*;
import javax.management.*;
@@ -1073,8 +1072,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (cfg.getIncludeEventTypes() != null && cfg.getIncludeEventTypes().length != 0)
perf.add("Disable grid events (remove 'includeEventTypes' from configuration)");
- if (Boolean.TRUE.equals(cfg.isClientMode()) && cfg.getDiscoverySpi() instanceof TcpDiscoverySpi)
- perf.add("Use TcpClientDiscoverySpi instead of TcpDiscoverySpi to run client node");
+ if (Boolean.TRUE.equals(cfg.isClientMode()) && !cfg.getDiscoverySpi().isClientMode())
+ perf.add("Use TcpDiscoverySpi in client mode for client node");
if (OptimizedMarshaller.available() && !(cfg.getMarshaller() instanceof OptimizedMarshaller))
perf.add("Enable optimized marshaller (set 'marshaller' to " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 728fce6..13b015b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1744,8 +1744,8 @@ public class IgnitionEx {
}
if (myCfg.isClientMode() == null || !myCfg.isClientMode()) {
- if (myCfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi) {
- throw new IgniteCheckedException("TcpClientDiscoverySpi can be used in client mode only" +
+ if (myCfg.getDiscoverySpi().isClientMode()) {
+ throw new IgniteCheckedException("DiscoverySpi is in client mode, but node is not in client mode" +
"(consider changing 'IgniteConfiguration.clientMode' to 'true').");
}
}
@@ -1815,14 +1815,14 @@ public class IgnitionEx {
*/
private void initializeDefaultSpi(IgniteConfiguration cfg) {
if (cfg.getDiscoverySpi() == null) {
+ cfg.setDiscoverySpi(new TcpDiscoverySpi());
+
if (cfg.isClientMode() != null && cfg.isClientMode())
- cfg.setDiscoverySpi(new TcpClientDiscoverySpi());
- else
- cfg.setDiscoverySpi(new TcpDiscoverySpi());
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientMode(true);
}
- if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpiAdapter) {
- TcpDiscoverySpiAdapter tcpDisco = (TcpDiscoverySpiAdapter)cfg.getDiscoverySpi();
+ if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpi) {
+ TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)cfg.getDiscoverySpi();
if (tcpDisco.getIpFinder() == null)
tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 7836e0f..4996d16 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -77,11 +78,10 @@ public interface DiscoverySpi extends IgniteSpi {
/**
* Sets node attributes and node version which will be distributed in grid during
* join process. Note that these attributes cannot be changed and set only once.
- *
- * @param attrs Map of node attributes.
+ * @param attrs Map of node attributes.
* @param ver Product version.
*/
- public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver);
+ public TcpDiscoverySpi setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver);
/**
* Sets a listener for discovery events. Refer to
@@ -102,7 +102,7 @@ public interface DiscoverySpi extends IgniteSpi {
*
* @param exchange Discovery data exchange handler.
*/
- public void setDataExchange(DiscoverySpiDataExchange exchange);
+ public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange);
/**
* Sets discovery metrics provider. Use metrics provided by
@@ -111,7 +111,7 @@ public interface DiscoverySpi extends IgniteSpi {
*
* @param metricsProvider Provider of metrics data.
*/
- public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
+ public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
/**
* Tells discovery SPI to disconnect from topology. This is very close to calling
@@ -152,4 +152,11 @@ public interface DiscoverySpi extends IgniteSpi {
* @param nodeId Node ID.
*/
public void failNode(UUID nodeId);
+
+ /**
+ * Whether or not discovery is in client mode.
+ *
+ * @return {@code true} if node is in client mode.
+ */
+ public boolean isClientMode();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
new file mode 100644
index 0000000..455b2af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -0,0 +1,1481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+
+/**
+ *
+ */
+class ClientImpl extends TcpDiscoveryImpl {
+ /** */
+ private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
+
+ /** */
+ private static final Object SPI_STOP = "SPI_STOP";
+
+ /** */
+ private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
+
+ /** Remote nodes. */
+ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
+
+ /** Topology history. */
+ private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
+ /** Remote nodes. */
+ private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
+
+ /** Socket writer. */
+ private SocketWriter sockWriter;
+
+ /** */
+ private SocketReader sockReader;
+
+ /** */
+ private boolean segmented;
+
+ /** Last message ID. */
+ private volatile IgniteUuid lastMsgId;
+
+ /** Current topology version. */
+ private volatile long topVer;
+
+ /** Join error. Contains error what occurs on join process. */
+ private IgniteSpiException joinErr;
+
+ /** Joined latch. */
+ private final CountDownLatch joinLatch = new CountDownLatch(1);
+
+ /** Left latch. */
+ private final CountDownLatch leaveLatch = new CountDownLatch(1);
+
+ /** */
+ private final Timer timer = new Timer("TcpDiscoverySpi.timer");
+
+ /** */
+ protected MessageWorker msgWorker;
+
+ /**
+ * @param adapter Adapter.
+ */
+ ClientImpl(TcpDiscoverySpi adapter) {
+ super(adapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dumpDebugInfo(IgniteLogger log) {
+ StringBuilder b = new StringBuilder(U.nl());
+
+ b.append(">>>").append(U.nl());
+ b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
+ b.append(">>>").append(U.nl());
+
+ b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl());
+ b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());
+
+ b.append("Internal threads: ").append(U.nl());
+
+ b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
+ b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
+ b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());
+ b.append(" Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("Nodes: ").append(U.nl());
+
+ for (ClusterNode node : allVisibleNodes())
+ b.append(" ").append(node.id()).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("Stats: ").append(adapter.stats).append(U.nl());
+
+ U.quietAndInfo(log, b.toString());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSpiState() {
+
+ if (sockWriter.isOnline())
+ return "connected";
+
+ return "disconnected";
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMessageWorkerQueueSize() {
+ return msgWorker.queueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID getCoordinator() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+ adapter.initLocalNode(0, true);
+
+ locNode = adapter.locNode;
+
+ sockWriter = new SocketWriter();
+ sockWriter.start();
+
+ sockReader = new SocketReader();
+ sockReader.start();
+
+ msgWorker = new MessageWorker();
+ msgWorker.start();
+
+ try {
+ joinLatch.await();
+
+ if (joinErr != null)
+ throw joinErr;
+ }
+ catch (InterruptedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+
+ timer.schedule(new HeartbeatSender(), adapter.hbFreq, adapter.hbFreq);
+
+ adapter.printStartInfo();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ timer.cancel();
+
+ if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive
+ msgWorker.addMessage(SPI_STOP);
+
+ try {
+ if (!leaveLatch.await(adapter.netTimeout, MILLISECONDS))
+ U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
+ }
+ catch (InterruptedException ignored) {
+
+ }
+ }
+
+ for (GridFutureAdapter<Boolean> fut : pingFuts.values())
+ fut.onDone(false);
+
+ rmtNodes.clear();
+
+ U.interrupt(msgWorker);
+ U.interrupt(sockWriter);
+ U.interrupt(sockReader);
+
+ U.join(msgWorker, log);
+ U.join(sockWriter, log);
+ U.join(sockReader, log);
+
+ adapter.printStopInfo();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> getRemoteNodes() {
+ return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+ if (getLocalNodeId().equals(nodeId))
+ return locNode;
+
+ TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+ return node != null && node.visible() ? node : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(@NotNull final UUID nodeId) {
+ if (nodeId.equals(getLocalNodeId()))
+ return true;
+
+ TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+ if (node == null || !node.visible())
+ return false;
+
+ GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
+
+ if (fut == null) {
+ fut = new GridFutureAdapter<>();
+
+ GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);
+
+ if (oldFut != null)
+ fut = oldFut;
+ else {
+ if (adapter.getSpiContext().isStopping()) {
+ if (pingFuts.remove(nodeId, fut))
+ fut.onDone(false);
+
+ return false;
+ }
+
+ final GridFutureAdapter<Boolean> finalFut = fut;
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (pingFuts.remove(nodeId, finalFut))
+ finalFut.onDone(false);
+ }
+ }, adapter.netTimeout);
+
+ sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+ }
+ }
+
+ try {
+ return fut.get();
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ return false;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException(e); // Should newer occur
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disconnect() throws IgniteSpiException {
+ U.interrupt(msgWorker);
+ U.interrupt(sockWriter);
+ U.interrupt(sockReader);
+
+ U.join(msgWorker, log);
+ U.join(sockWriter, log);
+ U.join(sockReader, log);
+
+ leaveLatch.countDown();
+ joinLatch.countDown();
+
+ adapter.getSpiContext().deregisterPorts();
+
+ Collection<ClusterNode> rmts = getRemoteNodes();
+
+ // This is restart/disconnection and remote nodes are not empty.
+ // We need to fire FAIL event for each.
+ DiscoverySpiListener lsnr = adapter.lsnr;
+
+ if (lsnr != null) {
+ for (ClusterNode n : rmts) {
+ rmtNodes.remove(n.id());
+
+ Collection<ClusterNode> top = updateTopologyHistory(topVer + 1);
+
+ lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
+ }
+ }
+
+ rmtNodes.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
+ if (segmented)
+ throw new IgniteException("Failed to send custom message: client is disconnected");
+
+ try {
+ sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
+ adapter.marsh.marshal(evt)));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId) {
+ ClusterNode node = rmtNodes.get(nodeId);
+
+ if (node != null) {
+ TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
+ node.id(), node.order());
+
+ msgWorker.addMessage(msg);
+ }
+ }
+
+ /**
+ * @return Opened socket or {@code null} if timeout.
+ * @see TcpDiscoverySpi#joinTimeout
+ */
+ @SuppressWarnings("BusyWait")
+ @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
+ Collection<InetSocketAddress> addrs = null;
+
+ long startTime = U.currentTimeMillis();
+
+ while (true) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException();
+
+ while (addrs == null || addrs.isEmpty()) {
+ addrs = adapter.resolvedAddresses();
+
+ if (!F.isEmpty(addrs)) {
+ if (log.isDebugEnabled())
+ log.debug("Resolved addresses from IP finder: " + addrs);
+ }
+ else {
+ U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + adapter.ipFinder);
+
+ if (adapter.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > adapter.joinTimeout)
+ return null;
+
+ Thread.sleep(2000);
+ }
+ }
+
+ Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
+
+ Iterator<InetSocketAddress> it = addrs.iterator();
+
+ while (it.hasNext()) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException();
+
+ InetSocketAddress addr = it.next();
+
+ Socket sock = null;
+
+ try {
+ long ts = U.currentTimeMillis();
+
+ IgniteBiTuple<Socket, UUID> t = initConnection(addr);
+
+ sock = t.get1();
+
+ UUID rmtNodeId = t.get2();
+
+ adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
+
+ locNode.clientRouterNodeId(rmtNodeId);
+
+ TcpDiscoveryAbstractMessage msg = recon ?
+ new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
+ lastMsgId) :
+ new TcpDiscoveryJoinRequestMessage(locNode, adapter.collectExchangeData(getLocalNodeId()));
+
+ msg.client(true);
+
+ adapter.writeToSocket(sock, msg);
+
+ int res = adapter.readReceipt(sock, adapter.ackTimeout);
+
+ switch (res) {
+ case RES_OK:
+ return sock;
+
+ case RES_CONTINUE_JOIN:
+ case RES_WAIT:
+ U.closeQuiet(sock);
+
+ break;
+
+ default:
+ if (log.isDebugEnabled())
+ log.debug("Received unexpected response to join request: " + res);
+
+ U.closeQuiet(sock);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to establish connection with address: " + addr, e);
+
+ U.closeQuiet(sock);
+
+ it.remove();
+ }
+ }
+
+ if (addrs.isEmpty()) {
+ U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
+ "in 2000ms): " + addrs0);
+
+ if (adapter.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > adapter.joinTimeout)
+ return null;
+
+ Thread.sleep(2000);
+ }
+ }
+ }
+
+ /**
+ * @param topVer New topology version.
+ * @return Latest topology snapshot.
+ */
+ private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) {
+ this.topVer = topVer;
+
+ NavigableSet<ClusterNode> allNodes = allVisibleNodes();
+
+ if (!topHist.containsKey(topVer)) {
+ assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+ "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+
+ topHist.put(topVer, allNodes);
+
+ if (topHist.size() > adapter.topHistSize)
+ topHist.pollFirstEntry();
+
+ assert topHist.lastKey() == topVer;
+ assert topHist.size() <= adapter.topHistSize;
+ }
+
+ return allNodes;
+ }
+
+ /**
+ * @return All nodes.
+ */
+ private NavigableSet<ClusterNode> allVisibleNodes() {
+ NavigableSet<ClusterNode> allNodes = new TreeSet<>();
+
+ for (TcpDiscoveryNode node : rmtNodes.values()) {
+ if (node.visible())
+ allNodes.add(node);
+ }
+
+ allNodes.add(locNode);
+
+ return allNodes;
+ }
+
+ /**
+ * @param addr Address.
+ * @return Remote node ID.
+ * @throws IOException In case of I/O error.
+ * @throws IgniteCheckedException In case of other error.
+ */
+ private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException {
+ assert addr != null;
+
+ Socket sock = adapter.openSocket(addr);
+
+ TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
+
+ req.client(true);
+
+ adapter.writeToSocket(sock, req);
+
+ TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, adapter.ackTimeout);
+
+ UUID nodeId = res.creatorNodeId();
+
+ assert nodeId != null;
+ assert !getLocalNodeId().equals(nodeId);
+
+ return F.t(sock, nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override void simulateNodeFailure() {
+ U.warn(log, "Simulating client node failure: " + getLocalNodeId());
+
+ U.interrupt(sockWriter);
+ U.interrupt(msgWorker);
+ U.interrupt(adapter.sockTimeoutWorker);
+
+ U.join(sockWriter, log);
+ U.join(msgWorker, log);
+ U.join(adapter.sockTimeoutWorker, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void brakeConnection() {
+ U.closeQuiet(msgWorker.currSock);
+ }
+
+ @Override protected IgniteSpiThread workerThread() {
+ return msgWorker;
+ }
+
+ /**
+ * FOR TEST PURPOSE ONLY!
+ */
+ public void waitForClientMessagePrecessed() {
+ Object last = msgWorker.queue.peekLast();
+
+ while (last != null && msgWorker.isAlive() && msgWorker.queue.contains(last)) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Heartbeat sender.
+ */
+ private class HeartbeatSender extends TimerTask {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ if (!adapter.getSpiContext().isStopping() && sockWriter.isOnline()) {
+ TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(),
+ adapter.metricsProvider.metrics());
+
+ msg.client(true);
+
+ sockWriter.sendMessage(msg);
+ }
+ }
+ }
+
+ /**
+ * Socket reader.
+ */
+ private class SocketReader extends IgniteSpiThread {
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private Socket sock;
+
+ /** */
+ private UUID rmtNodeId;
+
+ /**
+ */
+ protected SocketReader() {
+ super(adapter.ignite().name(), "tcp-client-disco-sock-reader", log);
+ }
+
+ /**
+ * @param sock Socket.
+ * @param rmtNodeId Rmt node id.
+ */
+ public void setSocket(Socket sock, UUID rmtNodeId) {
+ synchronized (mux) {
+ this.sock = sock;
+
+ this.rmtNodeId = rmtNodeId;
+
+ mux.notifyAll();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ while (!isInterrupted()) {
+ Socket sock;
+ UUID rmtNodeId;
+
+ synchronized (mux) {
+ if (this.sock == null) {
+ mux.wait();
+
+ continue;
+ }
+
+ sock = this.sock;
+ rmtNodeId = this.rmtNodeId;
+ }
+
+ try {
+ InputStream in = new BufferedInputStream(sock.getInputStream());
+
+ sock.setKeepAlive(true);
+ sock.setTcpNoDelay(true);
+
+ while (!isInterrupted()) {
+ TcpDiscoveryAbstractMessage msg;
+
+ try {
+ msg = adapter.marsh.unmarshal(in, U.gridClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to read message [sock=" + sock + ", " +
+ "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);
+
+ IOException ioEx = X.cause(e, IOException.class);
+
+ if (ioEx != null)
+ throw ioEx;
+
+ ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
+
+ if (clsNotFoundEx != null)
+ LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
+ "(make sure same versions of all classes are available on all nodes) " +
+ "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
+ else
+ LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" +
+ getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']');
+
+ continue;
+ }
+
+ msg.senderNodeId(rmtNodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Message has been received: " + msg);
+
+ adapter.stats.onMessageReceived(msg);
+
+ if (adapter.ensured(msg))
+ lastMsgId = msg.id();
+
+ msgWorker.addMessage(msg);
+ }
+ }
+ catch (IOException e) {
+ msgWorker.addMessage(new SocketClosedMessage(sock));
+
+ if (log.isDebugEnabled())
+ U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
+ }
+ finally {
+ U.closeQuiet(sock);
+
+ synchronized (mux) {
+ if (this.sock == sock) {
+ this.sock = null;
+ this.rmtNodeId = null;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private class SocketWriter extends IgniteSpiThread {
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private Socket sock;
+
+ /** */
+ private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
+
+ /**
+ *
+ */
+ protected SocketWriter() {
+ super(adapter.ignite().name(), "tcp-client-disco-sock-writer", log);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void sendMessage(TcpDiscoveryAbstractMessage msg) {
+ synchronized (mux) {
+ queue.add(msg);
+
+ mux.notifyAll();
+ }
+ }
+
+ /**
+ * @param sock Socket.
+ */
+ private void setSocket(Socket sock) {
+ synchronized (mux) {
+ this.sock = sock;
+
+ mux.notifyAll();
+ }
+ }
+
+ /**
+ *
+ */
+ public boolean isOnline() {
+ synchronized (mux) {
+ return sock != null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ TcpDiscoveryAbstractMessage msg = null;
+
+ while (!Thread.currentThread().isInterrupted()) {
+ Socket sock;
+
+ synchronized (mux) {
+ sock = this.sock;
+
+ if (sock == null) {
+ mux.wait();
+
+ continue;
+ }
+
+ if (msg == null)
+ msg = queue.poll();
+
+ if (msg == null) {
+ mux.wait();
+
+ continue;
+ }
+ }
+
+ for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs)
+ msgLsnr.apply(msg);
+
+ try {
+ adapter.writeToSocket(sock, msg);
+
+ msg = null;
+ }
+ catch (IOException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock + ']', e);
+
+ U.closeQuiet(sock);
+
+ synchronized (mux) {
+ if (sock == this.sock)
+ this.sock = null; // Connection has dead.
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + msg, e);
+
+ msg = null;
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private class Reconnector extends IgniteSpiThread {
+ /** */
+ private volatile Socket sock;
+
+ /**
+ *
+ */
+ protected Reconnector() {
+ super(adapter.ignite().name(), "tcp-client-disco-msg-worker", log);
+ }
+
+ /**
+ *
+ */
+ public void cancel() {
+ interrupt();
+
+ U.closeQuiet(sock);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ assert !segmented;
+
+ boolean success = false;
+
+ try {
+ sock = joinTopology(true);
+
+ if (sock == null) {
+ U.error(log, "Failed to reconnect to cluster: timeout.");
+
+ return;
+ }
+
+ if (isInterrupted())
+ throw new InterruptedException();
+
+ InputStream in = new BufferedInputStream(sock.getInputStream());
+
+ sock.setKeepAlive(true);
+ sock.setTcpNoDelay(true);
+
+ // Wait for
+ while (!isInterrupted()) {
+ TcpDiscoveryAbstractMessage msg = adapter.marsh.unmarshal(in, U.gridClassLoader());
+
+ if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+ TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
+
+ if (res.creatorNodeId().equals(getLocalNodeId())) {
+ if (res.success()) {
+ msgWorker.addMessage(res);
+
+ success = true;
+ }
+
+ break;
+ }
+ }
+
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ U.error(log, "Failed to reconnect", e);
+ }
+ finally {
+ if (!success) {
+ U.closeQuiet(sock);
+
+ msgWorker.addMessage(SPI_RECONNECT_FAILED);
+ }
+ }
+ }
+ }
+
+ /**
+ * Message worker.
+ */
+ protected class MessageWorker extends IgniteSpiThread {
+ /** Message queue. */
+ private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
+
+ /** */
+ private Socket currSock;
+
+ /** Indicates that pending messages are currently processed. */
+ private boolean pending;
+
+ /** */
+ private Reconnector reconnector;
+
+ /**
+ *
+ */
+ private MessageWorker() {
+ super(adapter.ignite().name(), "tcp-client-disco-msg-worker", log);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override protected void body() throws InterruptedException {
+ adapter.stats.onJoinStarted();
+
+ try {
+ final Socket sock = joinTopology(false);
+
+ if (sock == null) {
+ joinErr = new IgniteSpiException("Join process timed out");
+
+ joinLatch.countDown();
+
+ return;
+ }
+
+ currSock = sock;
+
+ sockWriter.setSocket(sock);
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (joinLatch.getCount() > 0)
+ queue.add(JOIN_TIMEOUT);
+ }
+ }, adapter.netTimeout);
+
+ sockReader.setSocket(sock, locNode.clientRouterNodeId());
+
+ while (true) {
+ Object msg = queue.take();
+
+ if (msg == JOIN_TIMEOUT) {
+ if (joinLatch.getCount() > 0) {
+ joinErr = new IgniteSpiException("Join process timed out [sock=" + sock +
+ ", timeout=" + adapter.netTimeout + ']');
+
+ joinLatch.countDown();
+
+ break;
+ }
+ }
+ else if (msg == SPI_STOP) {
+ assert adapter.getSpiContext().isStopping();
+
+ if (currSock != null) {
+ TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
+
+ leftMsg.client(true);
+
+ sockWriter.sendMessage(leftMsg);
+ }
+ else
+ leaveLatch.countDown();
+ }
+ else if (msg instanceof SocketClosedMessage) {
+ if (((SocketClosedMessage)msg).sock == currSock) {
+ currSock = null;
+
+ if (joinLatch.getCount() > 0) {
+ joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed.");
+
+ joinLatch.countDown();
+
+ break;
+ }
+ else {
+ if (adapter.getSpiContext().isStopping() || segmented)
+ leaveLatch.countDown();
+ else {
+ assert reconnector == null;
+
+ final Reconnector reconnector = new Reconnector();
+ this.reconnector = reconnector;
+ reconnector.start();
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (reconnector.isAlive())
+ reconnector.cancel();
+ }
+ }, adapter.netTimeout);
+ }
+ }
+ }
+ }
+ else if (msg == SPI_RECONNECT_FAILED) {
+ if (!segmented) {
+ segmented = true;
+
+ reconnector.cancel();
+ reconnector.join();
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
+ }
+ else {
+ TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
+
+ if (joinLatch.getCount() > 0) {
+ IgniteSpiException err = null;
+
+ if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
+ err = adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+ else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage)
+ err = adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+ else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage)
+ err = adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+
+ if (err != null) {
+ joinErr = err;
+
+ joinLatch.countDown();
+
+ break;
+ }
+ }
+
+ processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
+ }
+ }
+ }
+ finally {
+ U.closeQuiet(currSock);
+
+ if (joinLatch.getCount() > 0) {
+ // This should not occurs.
+ joinErr = new IgniteSpiException("Some error occurs in joinig process");
+
+ joinLatch.countDown();
+ }
+
+ if (reconnector != null) {
+ reconnector.cancel();
+
+ reconnector.join();
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null;
+ assert msg.verified() || msg.senderNodeId() == null;
+
+ adapter.stats.onMessageProcessingStarted(msg);
+
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+ processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+ processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
+ else if (msg instanceof TcpDiscoveryHeartbeatMessage)
+ processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+ else if (msg instanceof TcpDiscoveryClientReconnectMessage)
+ processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+ else if (msg instanceof TcpDiscoveryCustomEventMessage)
+ processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+ else if (msg instanceof TcpDiscoveryClientPingResponse)
+ processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
+ else if (msg instanceof TcpDiscoveryPingRequest)
+ processPingRequest();
+
+ adapter.stats.onMessageProcessingFinished(msg);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
+ if (adapter.getSpiContext().isStopping())
+ return;
+
+ TcpDiscoveryNode node = msg.node();
+
+ UUID newNodeId = node.id();
+
+ if (getLocalNodeId().equals(newNodeId)) {
+ if (joinLatch.getCount() > 0) {
+ Collection<TcpDiscoveryNode> top = msg.topology();
+
+ if (top != null) {
+ adapter.gridStartTime = msg.gridStartTime();
+
+ for (TcpDiscoveryNode n : top) {
+ if (n.order() > 0)
+ n.visible(true);
+
+ rmtNodes.put(n.id(), n);
+ }
+
+ topHist.clear();
+
+ if (msg.topologyHistory() != null)
+ topHist.putAll(msg.topologyHistory());
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node added message with empty topology: " + msg);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node added message (this message has already been processed) " +
+ "[msg=" + msg + ", locNode=" + locNode + ']');
+ }
+ else {
+ boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
+
+ if (topChanged) {
+ if (log.isDebugEnabled())
+ log.debug("Added new node to topology: " + node);
+
+ Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+
+ if (data != null)
+ adapter.onExchange(newNodeId, newNodeId, data, null);
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+ if (adapter.getSpiContext().isStopping())
+ return;
+
+ if (getLocalNodeId().equals(msg.nodeId())) {
+ if (joinLatch.getCount() > 0) {
+ Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
+
+ if (dataMap != null) {
+ for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
+ adapter.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null);
+ }
+
+ locNode.setAttributes(msg.clientNodeAttributes());
+ locNode.visible(true);
+
+ long topVer = msg.topologyVersion();
+
+ locNode.order(topVer);
+
+ notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
+
+ joinErr = null;
+
+ joinLatch.countDown();
+
+ adapter.stats.onJoinFinished();
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message (this message has already been processed) " +
+ "[msg=" + msg + ", locNode=" + locNode + ']');
+ }
+ else {
+ TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ long topVer = msg.topologyVersion();
+
+ node.order(topVer);
+ node.visible(true);
+
+ if (adapter.locNodeVer.equals(node.version()))
+ node.version(adapter.locNodeVer);
+
+ NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
+
+ adapter.stats.onNodeJoined();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
+ if (getLocalNodeId().equals(msg.creatorNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Received node left message for local node: " + msg);
+
+ leaveLatch.countDown();
+ }
+ else {
+ if (adapter.getSpiContext().isStopping())
+ return;
+
+ TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node left message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
+
+ adapter.stats.onNodeLeft();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
+ if (adapter.getSpiContext().isStopping()) {
+ if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
+ if (leaveLatch.getCount() > 0) {
+ log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
+ + ", rmtNode=" + msg.creatorNodeId() + ']');
+
+ leaveLatch.countDown();
+ }
+ }
+
+ return;
+ }
+
+ if (!getLocalNodeId().equals(msg.creatorNodeId())) {
+ TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node failed message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
+
+ adapter.stats.onNodeFailed();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
+ if (adapter.getSpiContext().isStopping())
+ return;
+
+ if (getLocalNodeId().equals(msg.creatorNodeId())) {
+ assert msg.senderNodeId() != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Received heartbeat response: " + msg);
+ }
+ else {
+ long tstamp = U.currentTimeMillis();
+
+ if (msg.hasMetrics()) {
+ for (Map.Entry<UUID, TcpDiscoveryHeartbeatMessage.MetricsSet> e : msg.metrics().entrySet()) {
+ UUID nodeId = e.getKey();
+
+ TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue();
+
+ Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
+ msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
+
+ updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
+
+ for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
+ updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+ if (adapter.getSpiContext().isStopping())
+ return;
+
+ if (getLocalNodeId().equals(msg.creatorNodeId())) {
+ assert msg.success();
+
+ currSock = reconnector.sock;
+
+ sockWriter.setSocket(currSock);
+ sockReader.setSocket(currSock, locNode.clientRouterNodeId());
+
+ reconnector = null;
+
+ pending = true;
+
+ try {
+ for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
+ processDiscoveryMessage(pendingMsg);
+ }
+ finally {
+ pending = false;
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding reconnect message for another client: " + msg);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+ if (msg.verified() && joinLatch.getCount() == 0) {
+ DiscoverySpiListener lsnr = adapter.lsnr;
+
+ if (lsnr != null) {
+ UUID nodeId = msg.creatorNodeId();
+
+ TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
+
+ if (node != null && node.visible()) {
+ try {
+ DiscoverySpiCustomMessage msgObj = msg.message(adapter.marsh);
+
+ notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to unmarshal discovery custom message.", e);
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received metrics from unknown node: " + nodeId);
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
+ GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
+
+ if (fut != null)
+ fut.onDone(msg.result());
+ }
+
+ /**
+ * Router want to ping this client.
+ */
+ private void processPingRequest() {
+ TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(getLocalNodeId());
+
+ res.client(true);
+
+ sockWriter.sendMessage(res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param metrics Metrics.
+ * @param cacheMetrics Cache metrics.
+ * @param tstamp Timestamp.
+ */
+ private void updateMetrics(UUID nodeId,
+ ClusterMetrics metrics,
+ Map<Integer, CacheMetrics> cacheMetrics,
+ long tstamp)
+ {
+ assert nodeId != null;
+ assert metrics != null;
+ assert cacheMetrics != null;
+
+ TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
+
+ if (node != null && node.visible()) {
+ node.setMetrics(metrics);
+ node.setCacheMetrics(cacheMetrics);
+
+ node.lastUpdateTime(tstamp);
+
+ notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received metrics from unknown node: " + nodeId);
+ }
+
+ /**
+ * @param type Event type.
+ * @param topVer Topology version.
+ * @param node Node.
+ * @param top Topology snapshot.
+ */
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) {
+ notifyDiscovery(type, topVer, node, top, null);
+ }
+
+ /**
+ * @param type Event type.
+ * @param topVer Topology version.
+ * @param node Node.
+ * @param top Topology snapshot.
+ */
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top,
+ @Nullable DiscoverySpiCustomMessage data) {
+ DiscoverySpiListener lsnr = adapter.lsnr;
+
+ if (lsnr != null) {
+ if (log.isDebugEnabled())
+ log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+ ", topVer=" + topVer + ']');
+
+ lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+ ", topVer=" + topVer + ']');
+ }
+
+ /**
+ * @param msg Message.
+ */
+ public void addMessage(Object msg) {
+ queue.add(msg);
+ }
+
+ /**
+ *
+ */
+ public int queueSize() {
+ return queue.size();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class SocketClosedMessage {
+ /** */
+ private final Socket sock;
+
+ /**
+ * @param sock Socket.
+ */
+ private SocketClosedMessage(Socket sock) {
+ this.sock = sock;
+ }
+ }
+}