You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/01 13:44:55 UTC
[03/53] [abbrv] incubator-ignite git commit: # IGNITE-943 Merge
TcpDiscoverySpi and TcpClientDiscoverySpi
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
index b988ceb..698735e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
@@ -44,7 +44,7 @@ class SocketMultiConnector implements AutoCloseable {
* @param addrs Addresses.
* @param retryCnt Retry count.
*/
- SocketMultiConnector(final TcpDiscoverySpiAdapter spi, Collection<InetSocketAddress> addrs,
+ SocketMultiConnector(final TcpDiscoverySpi spi, Collection<InetSocketAddress> addrs,
final int retryCnt) {
connInProgress = addrs.size();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
deleted file mode 100644
index 52c9016..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ /dev/null
@@ -1,1573 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.discovery.*;
-import org.apache.ignite.spi.discovery.tcp.internal.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
-import org.apache.ignite.spi.discovery.tcp.messages.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
-import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
-
-/**
- * Client discovery SPI implementation that uses TCP/IP for node discovery.
- * <p>
- * This discovery SPI requires at least one server node configured with
- * {@link TcpDiscoverySpi}. It will try to connect to random IP taken from
- * {@link TcpDiscoveryIpFinder} which should point to one of these server
- * nodes and will maintain connection only with this node (will not enter the ring).
- * If this connection is broken, it will try to reconnect using addresses from
- * the same IP finder.
- *
- * <h1 class="header">Configuration</h1>
- * <h2 class="header">Mandatory</h2>
- * There are no mandatory configuration parameters.
- * <h2 class="header">Optional</h2>
- * The following configuration parameters are optional:
- * <ul>
- * <li>IP finder to share info about nodes IP addresses
- * (see {@link #setIpFinder(TcpDiscoveryIpFinder)}).
- * See the following IP finder implementations for details on configuration:
- * <ul>
- * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder}</li>
- * <li>{@ignitelink org.apache.ignite.spi.discovery.tcp.ipfinder.s3.TcpDiscoveryS3IpFinder}</li>
- * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder}</li>
- * <li>{@link org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}</li>
- * <li>{@link TcpDiscoveryMulticastIpFinder} - default</li>
- * </ul>
- * </li>
- * </ul>
- * <ul>
- * <li>Local address (see {@link #setLocalAddress(String)})</li>
- * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)})</li>
- * <li>Network timeout (see {@link #setNetworkTimeout(long)})</li>
- * <li>Socket timeout (see {@link #setSocketTimeout(long)})</li>
- * <li>Message acknowledgement timeout (see {@link #setAckTimeout(long)})</li>
- * <li>Join timeout (see {@link #setJoinTimeout(long)})</li>
- * <li>Thread priority for threads started by SPI (see {@link #setThreadPriority(int)})</li>
- * </ul>
- * <h2 class="header">Java Example</h2>
- * <pre name="code" class="java">
- * TcpClientDiscoverySpi spi = new TcpClientDiscoverySpi();
- *
- * TcpDiscoveryVmIpFinder finder =
- * new GridTcpDiscoveryVmIpFinder();
- *
- * spi.setIpFinder(finder);
- *
- * IgniteConfiguration cfg = new IgniteConfiguration();
- *
- * // Override default discovery SPI.
- * cfg.setDiscoverySpi(spi);
- *
- * // Start grid.
- * Ignition.start(cfg);
- * </pre>
- * <h2 class="header">Spring Example</h2>
- * TcpClientDiscoverySpi can be configured from Spring XML configuration file:
- * <pre name="code" class="xml">
- * <bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
- * ...
- * <property name="discoverySpi">
- * <bean class="org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpi">
- * <property name="ipFinder">
- * <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder" />
- * </property>
- * </bean>
- * </property>
- * ...
- * </bean>
- * </pre>
- * <p>
- * <img src="http://ignite.incubator.apache.org/images/spring-small.png">
- * <br>
- * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
- * @see DiscoverySpi
- */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-@IgniteSpiMultipleInstancesSupport(true)
-@DiscoverySpiOrderSupport(true)
-@DiscoverySpiHistorySupport(true)
-public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpClientDiscoverySpiMBean {
- /** Default socket operations timeout in milliseconds (value is <tt>700ms</tt>). */
- public static final long DFLT_SOCK_TIMEOUT = 700;
-
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>700ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT = 700;
-
- /** */
- private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
-
- /** */
- private static final Object SPI_STOP = "SPI_STOP";
-
- /** */
- private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
-
- /** Remote nodes. */
- private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
-
- /** Topology history. */
- private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
-
- /** Remote nodes. */
- private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
-
- /** Socket writer. */
- private SocketWriter sockWriter;
-
- /** */
- private SocketReader sockReader;
-
- /** */
- private boolean segmented;
-
- /** Last message ID. */
- private volatile IgniteUuid lastMsgId;
-
- /** Current topology version. */
- private volatile long topVer;
-
- /** Join error. Contains error what occurs on join process. */
- private IgniteSpiException joinErr;
-
- /** Joined latch. */
- private final CountDownLatch joinLatch = new CountDownLatch(1);
-
- /** Left latch. */
- private final CountDownLatch leaveLatch = new CountDownLatch(1);
-
- /** */
- private final Timer timer = new Timer("TcpClientDiscoverySpi.timer");
-
- /** */
- protected MessageWorker msgWorker;
-
- /**
- * Default constructor.
- */
- public TcpClientDiscoverySpi() {
- ackTimeout = DFLT_ACK_TIMEOUT;
- sockTimeout = DFLT_SOCK_TIMEOUT;
- }
-
- /** {@inheritDoc} */
- @Override public int getMessageWorkerQueueSize() {
- return msgWorker.queueSize();
- }
-
- /** {@inheritDoc} */
- @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
- startStopwatch();
-
- checkParameters();
-
- assertParameter(threadPri > 0, "threadPri > 0");
-
- if (log.isDebugEnabled()) {
- log.debug(configInfo("localHost", locHost.getHostAddress()));
- log.debug(configInfo("threadPri", threadPri));
- log.debug(configInfo("networkTimeout", netTimeout));
- log.debug(configInfo("sockTimeout", sockTimeout));
- log.debug(configInfo("ackTimeout", ackTimeout));
- log.debug(configInfo("ipFinder", ipFinder));
- log.debug(configInfo("heartbeatFreq", hbFreq));
- }
-
- // Warn on odd network timeout.
- if (netTimeout < 3000)
- U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
-
- registerMBean(gridName, this, TcpClientDiscoverySpiMBean.class);
-
- try {
- locHost = U.resolveLocalHost(locAddr);
- }
- catch (IOException e) {
- throw new IgniteSpiException("Unknown local address: " + locAddr, e);
- }
-
- if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
- TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
-
- if (mcastIpFinder.getLocalAddress() == null)
- mcastIpFinder.setLocalAddress(locAddr);
- }
-
- IgniteBiTuple<Collection<String>, Collection<String>> addrs;
-
- try {
- addrs = U.resolveLocalAddresses(locHost);
- }
- catch (IOException | IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e);
- }
-
- locNode = new TcpDiscoveryNode(
- getLocalNodeId(),
- addrs.get1(),
- addrs.get2(),
- 0,
- metricsProvider,
- locNodeVer);
-
- locNode.setAttributes(locNodeAttrs);
- locNode.local(true);
-
- sockWriter = new SocketWriter();
- sockWriter.start();
-
- sockReader = new SocketReader();
- sockReader.start();
-
- sockTimeoutWorker = new SocketTimeoutWorker();
- sockTimeoutWorker.start();
-
- msgWorker = new MessageWorker();
- msgWorker.start();
-
- try {
- joinLatch.await();
-
- if (joinErr != null)
- throw joinErr;
- }
- catch (InterruptedException e) {
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
-
- timer.schedule(new HeartbeatSender(), hbFreq, hbFreq);
-
- if (log.isDebugEnabled())
- log.debug(startInfo());
- }
-
- /** {@inheritDoc} */
- @Override public void spiStop() throws IgniteSpiException {
- timer.cancel();
-
- if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive
- msgWorker.addMessage(SPI_STOP);
-
- try {
- if (!leaveLatch.await(netTimeout, MILLISECONDS))
- U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
- }
- catch (InterruptedException ignored) {
-
- }
- }
-
- for (GridFutureAdapter<Boolean> fut : pingFuts.values())
- fut.onDone(false);
-
- rmtNodes.clear();
-
- U.interrupt(sockTimeoutWorker);
- U.interrupt(msgWorker);
- U.interrupt(sockWriter);
- U.interrupt(sockReader);
-
- U.join(msgWorker, log);
- U.join(sockTimeoutWorker, log);
- U.join(sockWriter, log);
- U.join(sockReader, log);
-
- unregisterMBean();
-
- if (log.isDebugEnabled())
- log.debug(stopInfo());
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> getRemoteNodes() {
- return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public ClusterNode getNode(UUID nodeId) {
- if (getLocalNodeId().equals(nodeId))
- return locNode;
-
- TcpDiscoveryNode node = rmtNodes.get(nodeId);
-
- return node != null && node.visible() ? node : null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean pingNode(@NotNull final UUID nodeId) {
- if (nodeId.equals(getLocalNodeId()))
- return true;
-
- TcpDiscoveryNode node = rmtNodes.get(nodeId);
-
- if (node == null || !node.visible())
- return false;
-
- GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
-
- if (fut == null) {
- fut = new GridFutureAdapter<>();
-
- GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);
-
- if (oldFut != null)
- fut = oldFut;
- else {
- if (getSpiContext().isStopping()) {
- if (pingFuts.remove(nodeId, fut))
- fut.onDone(false);
-
- return false;
- }
-
- final GridFutureAdapter<Boolean> finalFut = fut;
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (pingFuts.remove(nodeId, finalFut))
- finalFut.onDone(false);
- }
- }, netTimeout);
-
- sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
- }
- }
-
- try {
- return fut.get();
- }
- catch (IgniteInterruptedCheckedException ignored) {
- return false;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException(e); // Should newer occur
- }
- }
-
- /** {@inheritDoc} */
- @Override public void disconnect() throws IgniteSpiException {
- U.interrupt(msgWorker);
- U.interrupt(sockWriter);
- U.interrupt(sockReader);
-
- U.join(msgWorker, log);
- U.join(sockWriter, log);
- U.join(sockReader, log);
-
- leaveLatch.countDown();
- joinLatch.countDown();
-
- getSpiContext().deregisterPorts();
-
- Collection<ClusterNode> rmts = getRemoteNodes();
-
- // This is restart/disconnection and remote nodes are not empty.
- // We need to fire FAIL event for each.
- DiscoverySpiListener lsnr = this.lsnr;
-
- if (lsnr != null) {
- for (ClusterNode n : rmts) {
- rmtNodes.remove(n.id());
-
- Collection<ClusterNode> top = updateTopologyHistory(topVer + 1);
-
- lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
- }
- }
-
- rmtNodes.clear();
- }
-
- /** {@inheritDoc} */
- @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
- if (segmented)
- throw new IgniteException("Failed to send custom message: client is disconnected");
-
- try {
- sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt)));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void failNode(UUID nodeId) {
- ClusterNode node = rmtNodes.get(nodeId);
-
- if (node != null) {
- TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
- node.id(), node.order());
-
- msgWorker.addMessage(msg);
- }
- }
-
- /**
- * @return Opened socket or {@code null} if timeout.
- * @see #joinTimeout
- */
- @SuppressWarnings("BusyWait")
- @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
- Collection<InetSocketAddress> addrs = null;
-
- long startTime = U.currentTimeMillis();
-
- while (true) {
- if (Thread.currentThread().isInterrupted())
- throw new InterruptedException();
-
- while (addrs == null || addrs.isEmpty()) {
- addrs = resolvedAddresses();
-
- if (!F.isEmpty(addrs)) {
- if (log.isDebugEnabled())
- log.debug("Resolved addresses from IP finder: " + addrs);
- }
- else {
- U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder);
-
- if (joinTimeout > 0 && (U.currentTimeMillis() - startTime) > joinTimeout)
- return null;
-
- Thread.sleep(2000);
- }
- }
-
- Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
-
- Iterator<InetSocketAddress> it = addrs.iterator();
-
- while (it.hasNext()) {
- if (Thread.currentThread().isInterrupted())
- throw new InterruptedException();
-
- InetSocketAddress addr = it.next();
-
- Socket sock = null;
-
- try {
- long ts = U.currentTimeMillis();
-
- IgniteBiTuple<Socket, UUID> t = initConnection(addr);
-
- sock = t.get1();
-
- UUID rmtNodeId = t.get2();
-
- stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
-
- locNode.clientRouterNodeId(rmtNodeId);
-
- TcpDiscoveryAbstractMessage msg = recon ?
- new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
- lastMsgId) :
- new TcpDiscoveryJoinRequestMessage(locNode, collectExchangeData(getLocalNodeId()));
-
- msg.client(true);
-
- writeToSocket(sock, msg);
-
- int res = readReceipt(sock, ackTimeout);
-
- switch (res) {
- case RES_OK:
- return sock;
-
- case RES_CONTINUE_JOIN:
- case RES_WAIT:
- U.closeQuiet(sock);
-
- break;
-
- default:
- if (log.isDebugEnabled())
- log.debug("Received unexpected response to join request: " + res);
-
- U.closeQuiet(sock);
- }
- }
- catch (IOException | IgniteCheckedException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to establish connection with address: " + addr, e);
-
- U.closeQuiet(sock);
-
- it.remove();
- }
- }
-
- if (addrs.isEmpty()) {
- U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
- "in 2000ms): " + addrs0);
-
- if (joinTimeout > 0 && (U.currentTimeMillis() - startTime) > joinTimeout)
- return null;
-
- Thread.sleep(2000);
- }
- }
- }
-
- /**
- * @param topVer New topology version.
- * @return Latest topology snapshot.
- */
- private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) {
- this.topVer = topVer;
-
- NavigableSet<ClusterNode> allNodes = allVisibleNodes();
-
- if (!topHist.containsKey(topVer)) {
- assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
- "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
-
- topHist.put(topVer, allNodes);
-
- if (topHist.size() > topHistSize)
- topHist.pollFirstEntry();
-
- assert topHist.lastKey() == topVer;
- assert topHist.size() <= topHistSize;
- }
-
- return allNodes;
- }
-
- /**
- * @return All nodes.
- */
- private NavigableSet<ClusterNode> allVisibleNodes() {
- NavigableSet<ClusterNode> allNodes = new TreeSet<>();
-
- for (TcpDiscoveryNode node : rmtNodes.values()) {
- if (node.visible())
- allNodes.add(node);
- }
-
- allNodes.add(locNode);
-
- return allNodes;
- }
-
- /**
- * @param addr Address.
- * @return Remote node ID.
- * @throws IOException In case of I/O error.
- * @throws IgniteCheckedException In case of other error.
- */
- private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException {
- assert addr != null;
-
- Socket sock = openSocket(addr);
-
- TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
-
- req.client(true);
-
- writeToSocket(sock, req);
-
- TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout);
-
- UUID nodeId = res.creatorNodeId();
-
- assert nodeId != null;
- assert !getLocalNodeId().equals(nodeId);
-
- return F.t(sock, nodeId);
- }
-
- /**
- * FOR TEST PURPOSE ONLY!
- */
- void simulateNodeFailure() {
- U.warn(log, "Simulating client node failure: " + getLocalNodeId());
-
- U.interrupt(sockWriter);
- U.interrupt(msgWorker);
- U.interrupt(sockTimeoutWorker);
-
- U.join(sockWriter, log);
- U.join(msgWorker, log);
- U.join(sockTimeoutWorker, log);
- }
-
- /**
- * FOR TEST PURPOSE ONLY!
- */
- public void brakeConnection() {
- U.closeQuiet(msgWorker.currSock);
- }
-
- /**
- * FOR TEST PURPOSE ONLY!
- */
- public void waitForMessagePrecessed() {
- Object last = msgWorker.queue.peekLast();
-
- while (last != null && msgWorker.isAlive() && msgWorker.queue.contains(last)) {
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /**
- * Heartbeat sender.
- */
- private class HeartbeatSender extends TimerTask {
- /** {@inheritDoc} */
- @Override public void run() {
- if (!getSpiContext().isStopping() && sockWriter.isOnline()) {
- TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(),
- metricsProvider.metrics());
-
- msg.client(true);
-
- sockWriter.sendMessage(msg);
- }
- }
- }
-
- /**
- * Socket reader.
- */
- private class SocketReader extends IgniteSpiThread {
- /** */
- private final Object mux = new Object();
-
- /** */
- private Socket sock;
-
- /** */
- private UUID rmtNodeId;
-
- /**
- */
- protected SocketReader() {
- super(gridName, "tcp-client-disco-sock-reader", log);
- }
-
- /**
- * @param sock Socket.
- * @param rmtNodeId Rmt node id.
- */
- public void setSocket(Socket sock, UUID rmtNodeId) {
- synchronized (mux) {
- this.sock = sock;
-
- this.rmtNodeId = rmtNodeId;
-
- mux.notifyAll();
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- while (!isInterrupted()) {
- Socket sock;
- UUID rmtNodeId;
-
- synchronized (mux) {
- if (this.sock == null) {
- mux.wait();
-
- continue;
- }
-
- sock = this.sock;
- rmtNodeId = this.rmtNodeId;
- }
-
- try {
- InputStream in = new BufferedInputStream(sock.getInputStream());
-
- sock.setKeepAlive(true);
- sock.setTcpNoDelay(true);
-
- while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg;
-
- try {
- msg = marsh.unmarshal(in, U.gridClassLoader());
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to read message [sock=" + sock + ", " +
- "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);
-
- IOException ioEx = X.cause(e, IOException.class);
-
- if (ioEx != null)
- throw ioEx;
-
- ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
-
- if (clsNotFoundEx != null)
- LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
- "(make sure same versions of all classes are available on all nodes) " +
- "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
- else
- LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" +
- getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']');
-
- continue;
- }
-
- msg.senderNodeId(rmtNodeId);
-
- if (log.isDebugEnabled())
- log.debug("Message has been received: " + msg);
-
- stats.onMessageReceived(msg);
-
- if (ensured(msg))
- lastMsgId = msg.id();
-
- msgWorker.addMessage(msg);
- }
- }
- catch (IOException e) {
- msgWorker.addMessage(new SocketClosedMessage(sock));
-
- if (log.isDebugEnabled())
- U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
- }
- finally {
- U.closeQuiet(sock);
-
- synchronized (mux) {
- if (this.sock == sock) {
- this.sock = null;
- this.rmtNodeId = null;
- }
- }
- }
- }
- }
- }
-
- /**
- *
- */
- private class SocketWriter extends IgniteSpiThread {
- /** */
- private final Object mux = new Object();
-
- /** */
- private Socket sock;
-
- /** */
- private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
-
- /**
- *
- */
- protected SocketWriter() {
- super(gridName, "tcp-client-disco-sock-writer", log);
- }
-
- /**
- * @param msg Message.
- */
- private void sendMessage(TcpDiscoveryAbstractMessage msg) {
- synchronized (mux) {
- queue.add(msg);
-
- mux.notifyAll();
- }
- }
-
- /**
- * @param sock Socket.
- */
- private void setSocket(Socket sock) {
- synchronized (mux) {
- this.sock = sock;
-
- mux.notifyAll();
- }
- }
-
- /**
- *
- */
- public boolean isOnline() {
- synchronized (mux) {
- return sock != null;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- TcpDiscoveryAbstractMessage msg = null;
-
- while (!Thread.currentThread().isInterrupted()) {
- Socket sock;
-
- synchronized (mux) {
- sock = this.sock;
-
- if (sock == null) {
- mux.wait();
-
- continue;
- }
-
- if (msg == null)
- msg = queue.poll();
-
- if (msg == null) {
- mux.wait();
-
- continue;
- }
- }
-
- try {
- writeToSocket(sock, msg);
-
- msg = null;
- }
- catch (IOException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock + ']', e);
-
- U.closeQuiet(sock);
-
- synchronized (mux) {
- if (sock == this.sock)
- this.sock = null; // Connection has dead.
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send message: " + msg, e);
-
- msg = null;
- }
- }
- }
- }
-
- /**
- *
- */
- private class Reconnector extends IgniteSpiThread {
- /** */
- private volatile Socket sock;
-
- /**
- *
- */
- protected Reconnector() {
- super(gridName, "tcp-client-disco-msg-worker", log);
- }
-
- /**
- *
- */
- public void cancel() {
- interrupt();
-
- U.closeQuiet(sock);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- assert !segmented;
-
- boolean success = false;
-
- try {
- sock = joinTopology(true);
-
- if (sock == null) {
- U.error(log, "Failed to reconnect to cluster: timeout.");
-
- return;
- }
-
- if (isInterrupted())
- throw new InterruptedException();
-
- InputStream in = new BufferedInputStream(sock.getInputStream());
-
- sock.setKeepAlive(true);
- sock.setTcpNoDelay(true);
-
- // Wait for
- while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = marsh.unmarshal(in, U.gridClassLoader());
-
- if (msg instanceof TcpDiscoveryClientReconnectMessage) {
- TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
-
- if (res.creatorNodeId().equals(getLocalNodeId())) {
- if (res.success()) {
- msgWorker.addMessage(res);
-
- success = true;
- }
-
- break;
- }
- }
-
- }
- }
- catch (IOException | IgniteCheckedException e) {
- U.error(log, "Failed to reconnect", e);
- }
- finally {
- if (!success) {
- U.closeQuiet(sock);
-
- msgWorker.addMessage(SPI_RECONNECT_FAILED);
- }
- }
- }
- }
-
- /**
- * Message worker.
- */
- protected class MessageWorker extends IgniteSpiThread {
- /** Message queue. */
- private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
-
- /** */
- private Socket currSock;
-
- /** Indicates that pending messages are currently processed. */
- private boolean pending;
-
- /** */
- private Reconnector reconnector;
-
- /**
- *
- */
- private MessageWorker() {
- super(gridName, "tcp-client-disco-msg-worker", log);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("InfiniteLoopStatement")
- @Override protected void body() throws InterruptedException {
- stats.onJoinStarted();
-
- try {
- final Socket sock = joinTopology(false);
-
- if (sock == null) {
- joinErr = new IgniteSpiException("Join process timed out");
-
- joinLatch.countDown();
-
- return;
- }
-
- currSock = sock;
-
- sockWriter.setSocket(sock);
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (joinLatch.getCount() > 0)
- queue.add(JOIN_TIMEOUT);
- }
- }, netTimeout);
-
- sockReader.setSocket(sock, locNode.clientRouterNodeId());
-
- while (true) {
- Object msg = queue.take();
-
- if (msg == JOIN_TIMEOUT) {
- if (joinLatch.getCount() > 0) {
- joinErr = new IgniteSpiException("Join process timed out [sock=" + sock +
- ", timeout=" + netTimeout + ']');
-
- joinLatch.countDown();
-
- break;
- }
- }
- else if (msg == SPI_STOP) {
- assert getSpiContext().isStopping();
-
- if (currSock != null) {
- TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
-
- leftMsg.client(true);
-
- sockWriter.sendMessage(leftMsg);
- }
- else
- leaveLatch.countDown();
- }
- else if (msg instanceof SocketClosedMessage) {
- if (((SocketClosedMessage)msg).sock == currSock) {
- currSock = null;
-
- if (joinLatch.getCount() > 0) {
- joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed.");
-
- joinLatch.countDown();
-
- break;
- }
- else {
- if (getSpiContext().isStopping() || segmented)
- leaveLatch.countDown();
- else {
- assert reconnector == null;
-
- final Reconnector reconnector = new Reconnector();
- this.reconnector = reconnector;
- reconnector.start();
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (reconnector.isAlive())
- reconnector.cancel();
- }
- }, netTimeout);
- }
- }
- }
- }
- else if (msg == SPI_RECONNECT_FAILED) {
- if (!segmented) {
- segmented = true;
-
- reconnector.cancel();
- reconnector.join();
-
- notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
- }
- }
- else {
- TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
-
- if (joinLatch.getCount() > 0) {
- IgniteSpiException err = null;
-
- if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
- err = duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
- else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage)
- err = authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
- else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage)
- err = checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
-
- if (err != null) {
- joinErr = err;
-
- joinLatch.countDown();
-
- break;
- }
- }
-
- processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
- }
- }
- }
- finally {
- U.closeQuiet(currSock);
-
- if (joinLatch.getCount() > 0) {
- // This should not occurs.
- joinErr = new IgniteSpiException("Some error occurs in joinig process");
-
- joinLatch.countDown();
- }
-
- if (reconnector != null) {
- reconnector.cancel();
-
- reconnector.join();
- }
- }
- }
-
- /**
- * @param msg Message.
- */
- protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
- assert msg != null;
- assert msg.verified() || msg.senderNodeId() == null;
-
- stats.onMessageProcessingStarted(msg);
-
- if (msg instanceof TcpDiscoveryNodeAddedMessage)
- processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
- else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
- processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
- else if (msg instanceof TcpDiscoveryNodeLeftMessage)
- processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
- else if (msg instanceof TcpDiscoveryNodeFailedMessage)
- processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
- else if (msg instanceof TcpDiscoveryHeartbeatMessage)
- processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
- else if (msg instanceof TcpDiscoveryClientReconnectMessage)
- processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
- else if (msg instanceof TcpDiscoveryCustomEventMessage)
- processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
- else if (msg instanceof TcpDiscoveryClientPingResponse)
- processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
- else if (msg instanceof TcpDiscoveryPingRequest)
- processPingRequest();
-
- stats.onMessageProcessingFinished(msg);
- }
-
- /**
- * @param msg Message.
- */
- private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
- if (getSpiContext().isStopping())
- return;
-
- TcpDiscoveryNode node = msg.node();
-
- UUID newNodeId = node.id();
-
- if (getLocalNodeId().equals(newNodeId)) {
- if (joinLatch.getCount() > 0) {
- Collection<TcpDiscoveryNode> top = msg.topology();
-
- if (top != null) {
- gridStartTime = msg.gridStartTime();
-
- for (TcpDiscoveryNode n : top) {
- if (n.order() > 0)
- n.visible(true);
-
- rmtNodes.put(n.id(), n);
- }
-
- topHist.clear();
-
- if (msg.topologyHistory() != null)
- topHist.putAll(msg.topologyHistory());
- }
- else if (log.isDebugEnabled())
- log.debug("Discarding node added message with empty topology: " + msg);
- }
- else if (log.isDebugEnabled())
- log.debug("Discarding node added message (this message has already been processed) " +
- "[msg=" + msg + ", locNode=" + locNode + ']');
- }
- else {
- boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
-
- if (topChanged) {
- if (log.isDebugEnabled())
- log.debug("Added new node to topology: " + node);
-
- Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
-
- if (data != null)
- onExchange(newNodeId, newNodeId, data, null);
- }
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
- if (getSpiContext().isStopping())
- return;
-
- if (getLocalNodeId().equals(msg.nodeId())) {
- if (joinLatch.getCount() > 0) {
- Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
-
- if (dataMap != null) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null);
- }
-
- locNode.setAttributes(msg.clientNodeAttributes());
- locNode.visible(true);
-
- long topVer = msg.topologyVersion();
-
- locNode.order(topVer);
-
- notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
-
- joinErr = null;
-
- joinLatch.countDown();
-
- stats.onJoinFinished();
- }
- else if (log.isDebugEnabled())
- log.debug("Discarding node add finished message (this message has already been processed) " +
- "[msg=" + msg + ", locNode=" + locNode + ']');
- }
- else {
- TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
-
- return;
- }
-
- long topVer = msg.topologyVersion();
-
- node.order(topVer);
- node.visible(true);
-
- if (locNodeVer.equals(node.version()))
- node.version(locNodeVer);
-
- NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
-
- if (!pending && joinLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Discarding node add finished message (join process is not finished): " + msg);
-
- return;
- }
-
- notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
-
- stats.onNodeJoined();
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
- if (getLocalNodeId().equals(msg.creatorNodeId())) {
- if (log.isDebugEnabled())
- log.debug("Received node left message for local node: " + msg);
-
- leaveLatch.countDown();
- }
- else {
- if (getSpiContext().isStopping())
- return;
-
- TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
-
- return;
- }
-
- NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
-
- if (!pending && joinLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Discarding node left message (join process is not finished): " + msg);
-
- return;
- }
-
- notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
-
- stats.onNodeLeft();
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
- if (getSpiContext().isStopping()) {
- if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
- if (leaveLatch.getCount() > 0) {
- log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
- + ", rmtNode=" + msg.creatorNodeId() + ']');
-
- leaveLatch.countDown();
- }
- }
-
- return;
- }
-
- if (!getLocalNodeId().equals(msg.creatorNodeId())) {
- TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
-
- return;
- }
-
- NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
-
- if (!pending && joinLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Discarding node failed message (join process is not finished): " + msg);
-
- return;
- }
-
- notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
-
- stats.onNodeFailed();
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
- if (getSpiContext().isStopping())
- return;
-
- if (getLocalNodeId().equals(msg.creatorNodeId())) {
- assert msg.senderNodeId() != null;
-
- if (log.isDebugEnabled())
- log.debug("Received heartbeat response: " + msg);
- }
- else {
- long tstamp = U.currentTimeMillis();
-
- if (msg.hasMetrics()) {
- for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) {
- UUID nodeId = e.getKey();
-
- MetricsSet metricsSet = e.getValue();
-
- Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
- msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
-
- updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
-
- for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
- updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
- }
- }
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
- if (getSpiContext().isStopping())
- return;
-
- if (getLocalNodeId().equals(msg.creatorNodeId())) {
- assert msg.success();
-
- currSock = reconnector.sock;
-
- sockWriter.setSocket(currSock);
- sockReader.setSocket(currSock, locNode.clientRouterNodeId());
-
- reconnector = null;
-
- pending = true;
-
- try {
- for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
- processDiscoveryMessage(pendingMsg);
- }
- finally {
- pending = false;
- }
- }
- else if (log.isDebugEnabled())
- log.debug("Discarding reconnect message for another client: " + msg);
- }
-
- /**
- * @param msg Message.
- */
- private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
- if (msg.verified() && joinLatch.getCount() == 0) {
- DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
-
- if (lsnr != null) {
- UUID nodeId = msg.creatorNodeId();
-
- TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
-
- if (node != null && node.visible()) {
- try {
- DiscoverySpiCustomMessage msgObj = msg.message(marsh);
-
- notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
- }
- catch (Throwable e) {
- U.error(log, "Failed to unmarshal discovery custom message.", e);
- }
- }
- else if (log.isDebugEnabled())
- log.debug("Received metrics from unknown node: " + nodeId);
- }
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
- GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
-
- if (fut != null)
- fut.onDone(msg.result());
- }
-
- /**
- * Router want to ping this client.
- */
- private void processPingRequest() {
- TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(getLocalNodeId());
-
- res.client(true);
-
- sockWriter.sendMessage(res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param metrics Metrics.
- * @param cacheMetrics Cache metrics.
- * @param tstamp Timestamp.
- */
- private void updateMetrics(UUID nodeId,
- ClusterMetrics metrics,
- Map<Integer, CacheMetrics> cacheMetrics,
- long tstamp)
- {
- assert nodeId != null;
- assert metrics != null;
- assert cacheMetrics != null;
-
- TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
-
- if (node != null && node.visible()) {
- node.setMetrics(metrics);
- node.setCacheMetrics(cacheMetrics);
-
- node.lastUpdateTime(tstamp);
-
- notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
- }
- else if (log.isDebugEnabled())
- log.debug("Received metrics from unknown node: " + nodeId);
- }
-
- /**
- * @param type Event type.
- * @param topVer Topology version.
- * @param node Node.
- * @param top Topology snapshot.
- */
- private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) {
- notifyDiscovery(type, topVer, node, top, null);
- }
-
- /**
- * @param type Event type.
- * @param topVer Topology version.
- * @param node Node.
- * @param top Topology snapshot.
- */
- private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top,
- @Nullable DiscoverySpiCustomMessage data) {
- DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
-
- if (lsnr != null) {
- if (log.isDebugEnabled())
- log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
- ", topVer=" + topVer + ']');
-
- lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data);
- }
- else if (log.isDebugEnabled())
- log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
- ", topVer=" + topVer + ']');
- }
-
- /**
- * @param msg Message.
- */
- public void addMessage(Object msg) {
- queue.add(msg);
- }
-
- /**
- *
- */
- public int queueSize() {
- return queue.size();
- }
- }
-
- /**
- *
- */
- private static class SocketClosedMessage {
- /** */
- private final Socket sock;
-
- /**
- * @param sock Socket.
- */
- private SocketClosedMessage(Socket sock) {
- this.sock = sock;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
deleted file mode 100644
index 3101da8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.mxbean.*;
-import org.apache.ignite.spi.*;
-
-import java.util.*;
-
-/**
- * Management bean for {@link TcpClientDiscoverySpi}.
- */
-public interface TcpClientDiscoverySpiMBean extends IgniteSpiManagementMBean {
- /**
- * Gets socket timeout.
- *
- * @return Socket timeout.
- */
- @MXBeanDescription("Socket timeout.")
- public long getSocketTimeout();
-
- /**
- * Gets message acknowledgement timeout.
- *
- * @return Message acknowledgement timeout.
- */
- @MXBeanDescription("Message acknowledgement timeout.")
- public long getAckTimeout();
-
- /**
- * Gets network timeout.
- *
- * @return Network timeout.
- */
- @MXBeanDescription("Network timeout.")
- public long getNetworkTimeout();
-
- /**
- * Gets thread priority. All threads within SPI will be started with it.
- *
- * @return Thread priority.
- */
- @MXBeanDescription("Threads priority.")
- public int getThreadPriority();
-
- /**
- * Gets delay between heartbeat messages sent by coordinator.
- *
- * @return Time period in milliseconds.
- */
- @MXBeanDescription("Heartbeat frequency.")
- public long getHeartbeatFrequency();
-
- /**
- * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation).
- *
- * @return IPFinder (string representation).
- */
- @MXBeanDescription("IP Finder.")
- public String getIpFinderFormatted();
-
- /**
- * Gets message worker queue current size.
- *
- * @return Message worker queue current size.
- */
- @MXBeanDescription("Message worker queue current size.")
- public int getMessageWorkerQueueSize();
-
- /**
- * Gets joined nodes count.
- *
- * @return Nodes joined count.
- */
- @MXBeanDescription("Nodes joined count.")
- public long getNodesJoined();
-
- /**
- * Gets left nodes count.
- *
- * @return Left nodes count.
- */
- @MXBeanDescription("Nodes left count.")
- public long getNodesLeft();
-
- /**
- * Gets failed nodes count.
- *
- * @return Failed nodes count.
- */
- @MXBeanDescription("Nodes failed count.")
- public long getNodesFailed();
-
- /**
- * Gets avg message processing time.
- *
- * @return Avg message processing time.
- */
- @MXBeanDescription("Avg message processing time.")
- public long getAvgMessageProcessingTime();
-
- /**
- * Gets max message processing time.
- *
- * @return Max message processing time.
- */
- @MXBeanDescription("Max message processing time.")
- public long getMaxMessageProcessingTime();
-
- /**
- * Gets total received messages count.
- *
- * @return Total received messages count.
- */
- @MXBeanDescription("Total received messages count.")
- public int getTotalReceivedMessages();
-
- /**
- * Gets received messages counts (grouped by type).
- *
- * @return Map containing message types and respective counts.
- */
- @MXBeanDescription("Received messages by type.")
- public Map<String, Integer> getReceivedMessages();
-
- /**
- * Gets total processed messages count.
- *
- * @return Total processed messages count.
- */
- @MXBeanDescription("Total processed messages count.")
- public int getTotalProcessedMessages();
-
- /**
- * Gets processed messages counts (grouped by type).
- *
- * @return Map containing message types and respective counts.
- */
- @MXBeanDescription("Received messages by type.")
- public Map<String, Integer> getProcessedMessages();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
new file mode 100644
index 0000000..8dad92a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+abstract class TcpDiscoveryImpl {
+ /** Response OK. */
+ protected static final int RES_OK = 1;
+
+ /** Response CONTINUE JOIN. */
+ protected static final int RES_CONTINUE_JOIN = 100;
+
+ /** Response WAIT. */
+ protected static final int RES_WAIT = 200;
+
+ /** */
+ protected final TcpDiscoverySpi adapter;
+
+ /** */
+ protected final IgniteLogger log;
+
+ /** */
+ protected TcpDiscoveryNode locNode;
+
+ /**
+ * @param adapter Adapter.
+ */
+ TcpDiscoveryImpl(TcpDiscoverySpi adapter) {
+ this.adapter = adapter;
+
+ log = adapter.log;
+ }
+
+ /**
+ *
+ */
+ public UUID getLocalNodeId() {
+ return adapter.getLocalNodeId();
+ }
+
+ /**
+ * @param msg Error message.
+ * @param e Exception.
+ */
+ protected void onException(String msg, Exception e){
+ adapter.getExceptionRegistry().onException(msg, e);
+ }
+
+ /**
+ * @param log Logger.
+ */
+ public abstract void dumpDebugInfo(IgniteLogger log);
+
+ /**
+ *
+ */
+ public abstract String getSpiState();
+
+ /**
+ *
+ */
+ public abstract int getMessageWorkerQueueSize();
+
+ /**
+ *
+ */
+ public abstract UUID getCoordinator();
+
+ /**
+ *
+ */
+ public abstract Collection<ClusterNode> getRemoteNodes();
+
+ /**
+ * @param nodeId Node id.
+ */
+ @Nullable public abstract ClusterNode getNode(UUID nodeId);
+
+ /**
+ * @param nodeId Node id.
+ */
+ public abstract boolean pingNode(UUID nodeId);
+
+ /**
+ *
+ */
+ public abstract void disconnect() throws IgniteSpiException;
+
+ /**
+ * @param auth Auth.
+ */
+ public abstract void setAuthenticator(DiscoverySpiNodeAuthenticator auth);
+
+ /**
+ * @param msg Message.
+ */
+ public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;
+
+ /**
+ * @param nodeId Node id.
+ */
+ public abstract void failNode(UUID nodeId);
+
+ /**
+ * @param gridName Grid name.
+ */
+ public abstract void spiStart(@Nullable String gridName) throws IgniteSpiException;
+
+ /**
+ *
+ */
+ public abstract void spiStop() throws IgniteSpiException;
+
+ /**
+ * @param spiCtx Spi context.
+ */
+ public abstract void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException;
+
+ /**
+ * @param t Thread.
+ * @return Status as string.
+ */
+ protected static String threadStatus(Thread t) {
+ if (t == null)
+ return "N/A";
+
+ return t.isAlive() ? "alive" : "dead";
+ }
+
+ /**
+ * <strong>FOR TEST ONLY!!!</strong>
+ * <p>
+ * Simulates this node failure by stopping service threads. So, node will become
+ * unresponsive.
+ * <p>
+ * This method is intended for test purposes only.
+ */
+ abstract void simulateNodeFailure();
+
+ /**
+ * FOR TEST PURPOSE ONLY!
+ */
+ public abstract void brakeConnection();
+
+ /**
+ * FOR TEST PURPOSE ONLY!
+ */
+ protected abstract IgniteSpiThread workerThread();
+}