You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 18:27:40 UTC
[17/31] incubator-ignite git commit: ignite-471-2: huge merge from
sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
deleted file mode 100644
index 4196306..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ /dev/null
@@ -1,1264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.discovery.*;
-import org.apache.ignite.spi.discovery.tcp.internal.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
-import org.apache.ignite.spi.discovery.tcp.messages.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
-
-/**
- * Client discovery SPI implementation that uses TCP/IP for node discovery.
- * <p>
- * This discovery SPI requires at least on server node configured with
- * {@link TcpDiscoverySpi}. It will try to connect to random IP taken from
- * {@link TcpDiscoveryIpFinder} which should point to one of these server
- * nodes and will maintain connection only with this node (will not enter the ring).
- * If this connection is broken, it will try to reconnect using addresses from
- * the same IP finder.
- */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-@IgniteSpiMultipleInstancesSupport(true)
-@DiscoverySpiOrderSupport(true)
-@DiscoverySpiHistorySupport(true)
-public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpClientDiscoverySpiMBean {
- /** Default disconnect check interval. */
- public static final long DFLT_DISCONNECT_CHECK_INT = 2000;
-
- /** Remote nodes. */
- private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
-
- /** Socket. */
- private volatile Socket sock;
-
- /** Socket reader. */
- private volatile SocketReader sockRdr;
-
- /** Heartbeat sender. */
- private volatile HeartbeatSender hbSender;
-
- /** Disconnect handler. */
- private volatile DisconnectHandler disconnectHnd;
-
- /** Last message ID. */
- private volatile IgniteUuid lastMsgId;
-
- /** Current topology version. */
- private volatile long topVer;
-
- /** Join error. */
- private IgniteSpiException joinErr;
-
- /** Whether reconnect failed. */
- private boolean reconFailed;
-
- /** Joined latch. */
- private CountDownLatch joinLatch;
-
- /** Left latch. */
- private volatile CountDownLatch leaveLatch;
-
- /** Disconnect check interval. */
- private long disconnectCheckInt = DFLT_DISCONNECT_CHECK_INT;
-
- /** {@inheritDoc} */
- @Override public long getDisconnectCheckInterval() {
- return disconnectCheckInt;
- }
-
- /**
- * Sets disconnect check interval.
- *
- * @param disconnectCheckInt Disconnect check interval.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setDisconnectCheckInterval(long disconnectCheckInt) {
- this.disconnectCheckInt = disconnectCheckInt;
- }
-
- /** {@inheritDoc} */
- @Override public long getSocketTimeout() {
- return sockTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public long getAckTimeout() {
- return ackTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public long getNetworkTimeout() {
- return netTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public int getThreadPriority() {
- return threadPri;
- }
-
- /** {@inheritDoc} */
- @Override public long getHeartbeatFrequency() {
- return hbFreq;
- }
-
- /** {@inheritDoc} */
- @Override public String getIpFinderFormatted() {
- return ipFinder.toString();
- }
-
- /** {@inheritDoc} */
- @Override public int getMessageWorkerQueueSize() {
- SocketReader sockRdr0 = sockRdr;
-
- return sockRdr0 != null ? sockRdr0.msgWrk.queueSize() : 0;
- }
-
- /** {@inheritDoc} */
- @Override public long getNodesJoined() {
- return stats.joinedNodesCount();
- }
-
- /** {@inheritDoc} */
- @Override public long getNodesLeft() {
- return stats.leftNodesCount();
- }
-
- /** {@inheritDoc} */
- @Override public long getNodesFailed() {
- return stats.failedNodesCount();
- }
-
- /** {@inheritDoc} */
- @Override public long getAvgMessageProcessingTime() {
- return stats.avgMessageProcessingTime();
- }
-
- /** {@inheritDoc} */
- @Override public long getMaxMessageProcessingTime() {
- return stats.maxMessageProcessingTime();
- }
-
- /** {@inheritDoc} */
- @Override public int getTotalReceivedMessages() {
- return stats.totalReceivedMessages();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, Integer> getReceivedMessages() {
- return stats.receivedMessages();
- }
-
- /** {@inheritDoc} */
- @Override public int getTotalProcessedMessages() {
- return stats.totalProcessedMessages();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, Integer> getProcessedMessages() {
- return stats.processedMessages();
- }
-
- /** {@inheritDoc} */
- @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
- startStopwatch();
-
- assertParameter(ipFinder != null, "ipFinder != null");
- assertParameter(netTimeout > 0, "networkTimeout > 0");
- assertParameter(sockTimeout > 0, "sockTimeout > 0");
- assertParameter(ackTimeout > 0, "ackTimeout > 0");
- assertParameter(hbFreq > 0, "heartbeatFreq > 0");
- assertParameter(threadPri > 0, "threadPri > 0");
-
- try {
- locHost = U.resolveLocalHost(locAddr);
- }
- catch (IOException e) {
- throw new IgniteSpiException("Unknown local address: " + locAddr, e);
- }
-
- if (log.isDebugEnabled()) {
- log.debug(configInfo("localHost", locHost.getHostAddress()));
- log.debug(configInfo("threadPri", threadPri));
- log.debug(configInfo("networkTimeout", netTimeout));
- log.debug(configInfo("sockTimeout", sockTimeout));
- log.debug(configInfo("ackTimeout", ackTimeout));
- log.debug(configInfo("ipFinder", ipFinder));
- log.debug(configInfo("heartbeatFreq", hbFreq));
- }
-
- // Warn on odd network timeout.
- if (netTimeout < 3000)
- U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
-
- registerMBean(gridName, this, TcpClientDiscoverySpiMBean.class);
-
- try {
- locHost = U.resolveLocalHost(locAddr);
- }
- catch (IOException e) {
- throw new IgniteSpiException("Unknown local address: " + locAddr, e);
- }
-
- if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
- TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
-
- if (mcastIpFinder.getLocalAddress() == null)
- mcastIpFinder.setLocalAddress(locAddr);
- }
-
- IgniteBiTuple<Collection<String>, Collection<String>> addrs;
-
- try {
- addrs = U.resolveLocalAddresses(locHost);
- }
- catch (IOException | IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e);
- }
-
- locNode = new TcpDiscoveryNode(
- getLocalNodeId(),
- addrs.get1(),
- addrs.get2(),
- 0,
- metricsProvider,
- locNodeVer);
-
- locNode.setAttributes(locNodeAttrs);
- locNode.local(true);
-
- sockTimeoutWorker = new SocketTimeoutWorker();
- sockTimeoutWorker.start();
-
- joinTopology(false);
-
- disconnectHnd = new DisconnectHandler();
- disconnectHnd.start();
-
- if (log.isDebugEnabled())
- log.debug(startInfo());
- }
-
- /** {@inheritDoc} */
- @Override public void spiStop() throws IgniteSpiException {
- rmtNodes.clear();
-
- U.interrupt(disconnectHnd);
- U.join(disconnectHnd, log);
-
- U.interrupt(hbSender);
- U.join(hbSender, log);
-
- Socket sock0 = sock;
-
- sock = null;
-
- if (sock0 != null) {
- leaveLatch = new CountDownLatch(1);
-
- try {
- TcpDiscoveryNodeLeftMessage msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
-
- msg.client(true);
-
- writeToSocket(sock0, msg);
-
- if (!U.await(leaveLatch, netTimeout, MILLISECONDS)) {
- if (log.isDebugEnabled())
- log.debug("Did not receive node left message for local node (will stop anyway) [sock=" +
- sock0 + ']');
- }
- }
- catch (IOException | IgniteCheckedException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock0 + ']', e);
- }
- finally {
- U.closeQuiet(sock0);
- }
- }
-
- U.interrupt(sockRdr);
- U.join(sockRdr, log);
-
- U.interrupt(sockTimeoutWorker);
- U.join(sockTimeoutWorker, log);
-
- unregisterMBean();
-
- if (log.isDebugEnabled())
- log.debug(stopInfo());
- }
-
- /** {@inheritDoc} */
- @Override public Collection<Object> injectables() {
- return Arrays.<Object>asList(ipFinder);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> getRemoteNodes() {
- return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() {
- @Override public boolean apply(TcpDiscoveryNode node) {
- return node.visible();
- }
- }));
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public ClusterNode getNode(UUID nodeId) {
- if (getLocalNodeId().equals(nodeId))
- return locNode;
-
- TcpDiscoveryNode node = rmtNodes.get(nodeId);
-
- return node != null && node.visible() ? node : null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean pingNode(UUID nodeId) {
- assert nodeId != null;
-
- if (nodeId.equals(getLocalNodeId()))
- return true;
-
- TcpDiscoveryNode node = rmtNodes.get(nodeId);
-
- return node != null && node.visible();
- }
-
- /** {@inheritDoc} */
- @Override public void disconnect() throws IgniteSpiException {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void sendCustomEvent(Serializable evt) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public void failNode(UUID nodeId) {
- ClusterNode node = rmtNodes.get(nodeId);
-
- if (node != null) {
- TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
- node.id(), node.order());
-
- sockRdr.addMessage(msg);
- }
- }
-
- /**
- * @param recon Reconnect flag.
- * @return Whether joined successfully.
- * @throws IgniteSpiException In case of error.
- */
- private boolean joinTopology(boolean recon) throws IgniteSpiException {
- if (!recon)
- stats.onJoinStarted();
-
- Collection<InetSocketAddress> addrs = null;
-
- while (!Thread.currentThread().isInterrupted()) {
- try {
- while (addrs == null || addrs.isEmpty()) {
- addrs = resolvedAddresses();
-
- if (!F.isEmpty(addrs)) {
- if (log.isDebugEnabled())
- log.debug("Resolved addresses from IP finder: " + addrs);
- }
- else {
- U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder);
-
- U.sleep(2000);
- }
- }
-
- Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
-
- Iterator<InetSocketAddress> it = addrs.iterator();
-
- while (it.hasNext() && !Thread.currentThread().isInterrupted()) {
- InetSocketAddress addr = it.next();
-
- Socket sock = null;
-
- try {
- long ts = U.currentTimeMillis();
-
- IgniteBiTuple<Socket, UUID> t = initConnection(addr);
-
- sock = t.get1();
-
- UUID rmtNodeId = t.get2();
-
- stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
-
- locNode.clientRouterNodeId(rmtNodeId);
-
- TcpDiscoveryAbstractMessage msg = recon ?
- new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
- lastMsgId) :
- new TcpDiscoveryJoinRequestMessage(locNode, null);
-
- msg.client(true);
-
- writeToSocket(sock, msg);
-
- int res = readReceipt(sock, ackTimeout);
-
- switch (res) {
- case RES_OK:
- this.sock = sock;
-
- sockRdr = new SocketReader(rmtNodeId, new MessageWorker(recon));
- sockRdr.start();
-
- if (U.await(joinLatch, netTimeout, MILLISECONDS)) {
- IgniteSpiException joinErr0 = joinErr;
-
- if (joinErr0 != null)
- throw joinErr0;
-
- if (reconFailed) {
- if (log.isDebugEnabled())
- log.debug("Failed to reconnect, will try to rejoin [locNode=" +
- locNode + ']');
-
- U.closeQuiet(sock);
-
- U.interrupt(sockRdr);
- U.join(sockRdr, log);
-
- this.sock = null;
-
- return false;
- }
-
- if (log.isDebugEnabled())
- log.debug("Successfully connected to topology [sock=" + sock + ']');
-
- hbSender = new HeartbeatSender();
- hbSender.start();
-
- stats.onJoinFinished();
-
- return true;
- }
- else {
- U.warn(log, "Join process timed out (will try other address) [sock=" + sock +
- ", timeout=" + netTimeout + ']');
-
- U.closeQuiet(sock);
-
- U.interrupt(sockRdr);
- U.join(sockRdr, log);
-
- it.remove();
-
- break;
- }
-
- case RES_CONTINUE_JOIN:
- case RES_WAIT:
- U.closeQuiet(sock);
-
- break;
-
- default:
- if (log.isDebugEnabled())
- log.debug("Received unexpected response to join request: " + res);
-
- U.closeQuiet(sock);
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Joining thread was interrupted.");
-
- return false;
- }
- catch (IOException | IgniteCheckedException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to establish connection with address: " + addr, e);
-
- U.closeQuiet(sock);
-
- it.remove();
- }
- }
-
- if (addrs.isEmpty()) {
- U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
- "in 2000ms): " + addrs0);
-
- U.sleep(2000);
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Joining thread was interrupted.");
- }
- }
-
- return false;
- }
-
- /**
- * @param addr Address.
- * @return Remote node ID.
- * @throws IOException In case of I/O error.
- * @throws IgniteCheckedException In case of other error.
- */
- private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException {
- assert addr != null;
-
- joinLatch = new CountDownLatch(1);
-
- Socket sock = openSocket(addr);
-
- TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
-
- req.client(true);
-
- writeToSocket(sock, req);
-
- TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout);
-
- UUID nodeId = res.creatorNodeId();
-
- assert nodeId != null;
- assert !getLocalNodeId().equals(nodeId);
-
- return F.t(sock, nodeId);
- }
-
- /**
- * FOR TEST PURPOSE ONLY!
- */
- void simulateNodeFailure() {
- U.warn(log, "Simulating client node failure: " + getLocalNodeId());
-
- U.closeQuiet(sock);
-
- U.interrupt(disconnectHnd);
- U.join(disconnectHnd, log);
-
- U.interrupt(hbSender);
- U.join(hbSender, log);
-
- U.interrupt(sockRdr);
- U.join(sockRdr, log);
-
- U.interrupt(sockTimeoutWorker);
- U.join(sockTimeoutWorker, log);
- }
-
- /**
- * Disconnect handler.
- */
- private class DisconnectHandler extends IgniteSpiThread {
- /**
- */
- protected DisconnectHandler() {
- super(gridName, "tcp-client-disco-disconnect-hnd", log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- while (!isInterrupted()) {
- try {
- U.sleep(disconnectCheckInt);
-
- if (sock == null) {
- if (log.isDebugEnabled())
- log.debug("Node is disconnected from topology, will try to reconnect.");
-
- U.interrupt(hbSender);
- U.join(hbSender, log);
-
- U.interrupt(sockRdr);
- U.join(sockRdr, log);
-
- // If reconnection fails, try to rejoin.
- if (!joinTopology(true)) {
- rmtNodes.clear();
-
- locNode.order(0);
-
- joinTopology(false);
-
- getSpiContext().recordEvent(new DiscoveryEvent(locNode,
- "Client node reconnected: " + locNode,
- EVT_CLIENT_NODE_RECONNECTED, locNode));
- }
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Disconnect handler was interrupted.");
-
- return;
- }
- catch (IgniteSpiException e) {
- U.error(log, "Failed to reconnect to topology after failure.", e);
- }
- }
- }
- }
-
- /**
- * Heartbeat sender.
- */
- private class HeartbeatSender extends IgniteSpiThread {
- /**
- */
- protected HeartbeatSender() {
- super(gridName, "tcp-client-disco-heartbeat-sender", log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- Socket sock0 = sock;
-
- if (sock0 == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to start heartbeat sender, node is already disconnected.");
-
- return;
- }
-
- try {
- while (!isInterrupted()) {
- U.sleep(hbFreq);
-
- TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId());
-
- msg.client(true);
-
- sockRdr.addMessage(msg);
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Heartbeat sender was interrupted.");
- }
- }
- }
-
- /**
- * Socket reader.
- */
- private class SocketReader extends IgniteSpiThread {
- /** Remote node ID. */
- private final UUID nodeId;
-
- /** Message worker. */
- private final MessageWorker msgWrk;
-
- /**
- * @param nodeId Node ID.
- * @param msgWrk Message worker.
- */
- protected SocketReader(UUID nodeId, MessageWorker msgWrk) {
- super(gridName, "tcp-client-disco-sock-reader", log);
-
- assert nodeId != null;
- assert msgWrk != null;
-
- this.nodeId = nodeId;
- this.msgWrk = msgWrk;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void start() {
- super.start();
-
- msgWrk.start();
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- Socket sock0 = sock;
-
- if (sock0 == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to start socket reader, node is already disconnected.");
-
- return;
- }
-
- try {
- InputStream in = new BufferedInputStream(sock0.getInputStream());
-
- sock0.setKeepAlive(true);
- sock0.setTcpNoDelay(true);
-
- while (!isInterrupted()) {
- try {
- TcpDiscoveryAbstractMessage msg = marsh.unmarshal(in, U.gridClassLoader());
-
- msg.senderNodeId(nodeId);
-
- if (log.isDebugEnabled())
- log.debug("Message has been received: " + msg);
-
- stats.onMessageReceived(msg);
-
- if (joinLatch.getCount() > 0) {
- IgniteSpiException err = null;
-
- if (msg instanceof TcpDiscoveryDuplicateIdMessage)
- err = duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
- else if (msg instanceof TcpDiscoveryAuthFailedMessage)
- err = authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
- else if (msg instanceof TcpDiscoveryCheckFailedMessage)
- err = checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
-
- if (err != null) {
- joinErr = err;
-
- joinLatch.countDown();
-
- return;
- }
- }
-
- msgWrk.addMessage(msg);
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to read message [sock=" + sock0 + ", " +
- "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + nodeId + ']', e);
-
- IOException ioEx = X.cause(e, IOException.class);
-
- if (ioEx != null)
- throw ioEx;
-
- ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
-
- if (clsNotFoundEx != null)
- LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
- "(make sure same versions of all classes are available on all nodes) " +
- "[rmtNodeId=" + nodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
- else
- LT.error(log, e, "Failed to read message [sock=" + sock0 + ", locNodeId=" +
- getLocalNodeId() + ", rmtNodeId=" + nodeId + ']');
- }
- }
- }
- catch (IOException e) {
- if (log.isDebugEnabled())
- U.error(log, "Connection failed [sock=" + sock0 + ", locNodeId=" +
- getLocalNodeId() + ", rmtNodeId=" + nodeId + ']', e);
- }
- finally {
- U.closeQuiet(sock0);
-
- U.interrupt(msgWrk);
-
- try {
- U.join(msgWrk);
- }
- catch (IgniteInterruptedCheckedException ignored) {
- // No-op.
- }
-
- sock = null;
- }
- }
-
- /**
- * @param msg Message.
- */
- void addMessage(TcpDiscoveryAbstractMessage msg) {
- assert msg != null;
-
- msgWrk.addMessage(msg);
- }
- }
-
- /**
- * Message worker.
- */
- private class MessageWorker extends MessageWorkerAdapter {
- /** Topology history. */
- private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
-
- /** Indicates that reconnection is in progress. */
- private boolean recon;
-
- /** Indicates that pending messages are currently processed. */
- private boolean pending;
-
- /**
- * @param recon Whether reconnection is in progress.
- */
- protected MessageWorker(boolean recon) {
- super("tcp-client-disco-msg-worker");
-
- this.recon = recon;
- }
-
- /** {@inheritDoc} */
- @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
- assert msg != null;
- assert msg.verified() || msg.senderNodeId() == null;
-
- stats.onMessageProcessingStarted(msg);
-
- if (msg instanceof TcpDiscoveryClientReconnectMessage)
- processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
- else {
- if (recon && !pending) {
- if (log.isDebugEnabled())
- log.debug("Discarding message received during reconnection: " + msg);
- }
- else {
- if (msg instanceof TcpDiscoveryNodeAddedMessage)
- processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
- else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
- processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
- else if (msg instanceof TcpDiscoveryNodeLeftMessage)
- processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
- else if (msg instanceof TcpDiscoveryNodeFailedMessage)
- processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
- else if (msg instanceof TcpDiscoveryHeartbeatMessage)
- processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
-
- if (ensured(msg))
- lastMsgId = msg.id();
- }
- }
-
- stats.onMessageProcessingFinished(msg);
- }
-
- /**
- * @param msg Message.
- */
- private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
- if (leaveLatch != null)
- return;
-
- TcpDiscoveryNode node = msg.node();
-
- UUID newNodeId = node.id();
-
- if (getLocalNodeId().equals(newNodeId)) {
- if (joinLatch.getCount() > 0) {
- Collection<TcpDiscoveryNode> top = msg.topology();
-
- if (top != null) {
- gridStartTime = msg.gridStartTime();
-
- for (TcpDiscoveryNode n : top) {
- if (n.order() > 0)
- n.visible(true);
-
- rmtNodes.put(n.id(), n);
- }
-
- topHist.clear();
-
- if (msg.topologyHistory() != null)
- topHist.putAll(msg.topologyHistory());
-
- Map<UUID, Map<Integer, byte[]>> dataMap = msg.oldNodesDiscoveryData();
-
- if (dataMap != null) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- onExchange(newNodeId, entry.getKey(), entry.getValue(), null);
- }
-
- locNode.setAttributes(node.attributes());
- locNode.visible(true);
- }
- else if (log.isDebugEnabled())
- log.debug("Discarding node added message with empty topology: " + msg);
- }
- else if (log.isDebugEnabled())
- log.debug("Discarding node added message (this message has already been processed) " +
- "[msg=" + msg + ", locNode=" + locNode + ']');
- }
- else {
- boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
-
- if (topChanged) {
- if (log.isDebugEnabled())
- log.debug("Added new node to topology: " + node);
-
- Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
-
- if (data != null)
- onExchange(newNodeId, newNodeId, data, null);
- }
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
- if (leaveLatch != null)
- return;
-
- if (getLocalNodeId().equals(msg.nodeId())) {
- if (joinLatch.getCount() > 0) {
- long topVer = msg.topologyVersion();
-
- locNode.order(topVer);
-
- notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
-
- joinErr = null;
-
- joinLatch.countDown();
- }
- else if (log.isDebugEnabled())
- log.debug("Discarding node add finished message (this message has already been processed) " +
- "[msg=" + msg + ", locNode=" + locNode + ']');
- }
- else {
- TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
-
- return;
- }
-
- long topVer = msg.topologyVersion();
-
- node.order(topVer);
- node.visible(true);
-
- if (locNodeVer.equals(node.version()))
- node.version(locNodeVer);
-
- Collection<ClusterNode> top = updateTopologyHistory(topVer);
-
- if (!pending && joinLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Discarding node add finished message (join process is not finished): " + msg);
-
- return;
- }
-
- notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
-
- stats.onNodeJoined();
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
- if (getLocalNodeId().equals(msg.creatorNodeId())) {
- if (log.isDebugEnabled())
- log.debug("Received node left message for local node: " + msg);
-
- CountDownLatch leaveLatch0 = leaveLatch;
-
- assert leaveLatch0 != null;
-
- leaveLatch0.countDown();
- }
- else {
- if (leaveLatch != null)
- return;
-
- TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
-
- return;
- }
-
- Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
-
- if (!pending && joinLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Discarding node left message (join process is not finished): " + msg);
-
- return;
- }
-
- notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
-
- stats.onNodeLeft();
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
- if (leaveLatch != null)
- return;
-
- if (!getLocalNodeId().equals(msg.creatorNodeId())) {
- TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
-
- return;
- }
-
- Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
-
- if (!pending && joinLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Discarding node failed message (join process is not finished): " + msg);
-
- return;
- }
-
- notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
-
- stats.onNodeFailed();
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
- if (leaveLatch != null)
- return;
-
- if (getLocalNodeId().equals(msg.creatorNodeId())) {
- if (msg.senderNodeId() == null) {
- Socket sock0 = sock;
-
- if (sock0 != null) {
- UUID nodeId = ignite.configuration().getNodeId();
-
- msg.setMetrics(nodeId, metricsProvider.metrics());
-
- msg.setCacheMetrics(nodeId, metricsProvider.cacheMetrics());
-
- try {
- writeToSocket(sock0, msg);
-
- if (log.isDebugEnabled())
- log.debug("Heartbeat message sent [sock=" + sock0 + ", msg=" + msg + ']');
- }
- catch (IOException | IgniteCheckedException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to send heartbeat message [sock=" + sock0 +
- ", msg=" + msg + ']', e);
-
- U.closeQuiet(sock0);
-
- sock = null;
-
- interrupt();
- }
- }
- else if (log.isDebugEnabled())
- log.debug("Failed to send heartbeat message (node is disconnected): " + msg);
- }
- else if (log.isDebugEnabled())
- log.debug("Received heartbeat response: " + msg);
- }
- else {
- long tstamp = U.currentTimeMillis();
-
- if (msg.hasMetrics()) {
- for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) {
- UUID nodeId = e.getKey();
-
- MetricsSet metricsSet = e.getValue();
-
- Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
- msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
-
- updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
-
- for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
- updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
- }
- }
- }
- }
-
- /**
- * @param msg Message.
- */
- private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
- if (leaveLatch != null)
- return;
-
- if (getLocalNodeId().equals(msg.creatorNodeId())) {
- if (msg.success()) {
- pending = true;
-
- try {
- for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
- processMessage(pendingMsg);
- }
- finally {
- pending = false;
- }
-
- joinErr = null;
- reconFailed = false;
-
- joinLatch.countDown();
- }
- else {
- joinErr = null;
- reconFailed = true;
-
- getSpiContext().recordEvent(new DiscoveryEvent(locNode,
- "Client node disconnected: " + locNode,
- EVT_CLIENT_NODE_DISCONNECTED, locNode));
-
- joinLatch.countDown();
- }
- }
- else if (log.isDebugEnabled())
- log.debug("Discarding reconnect message for another client: " + msg);
- }
-
- /**
- * @param nodeId Node ID.
- * @param metrics Metrics.
- * @param cacheMetrics Cache metrics.
- * @param tstamp Timestamp.
- */
- private void updateMetrics(UUID nodeId,
- ClusterMetrics metrics,
- Map<Integer, CacheMetrics> cacheMetrics,
- long tstamp)
- {
- assert nodeId != null;
- assert metrics != null;
- assert cacheMetrics != null;
-
- TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
-
- if (node != null && node.visible()) {
- node.setMetrics(metrics);
- node.setCacheMetrics(cacheMetrics);
-
- node.lastUpdateTime(tstamp);
-
- notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes());
- }
- else if (log.isDebugEnabled())
- log.debug("Received metrics from unknown node: " + nodeId);
- }
-
- /**
- * @param topVer New topology version.
- * @return Latest topology snapshot.
- */
- private Collection<ClusterNode> updateTopologyHistory(long topVer) {
- TcpClientDiscoverySpi.this.topVer = topVer;
-
- Collection<ClusterNode> allNodes = allNodes();
-
- if (!topHist.containsKey(topVer)) {
- assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
- "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
-
- topHist.put(topVer, allNodes);
-
- if (topHist.size() > topHistSize)
- topHist.pollFirstEntry();
-
- assert topHist.lastKey() == topVer;
- assert topHist.size() <= topHistSize;
- }
-
- return allNodes;
- }
-
- /**
- * @return All nodes.
- */
- private Collection<ClusterNode> allNodes() {
- Collection<ClusterNode> allNodes = new TreeSet<>();
-
- for (TcpDiscoveryNode node : rmtNodes.values()) {
- if (node.visible())
- allNodes.add(node);
- }
-
- allNodes.add(locNode);
-
- return allNodes;
- }
-
- /**
- * @param type Event type.
- * @param topVer Topology version.
- * @param node Node.
- * @param top Topology snapshot.
- */
- private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) {
- DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
-
- if (lsnr != null) {
- if (log.isDebugEnabled())
- log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
- ", topVer=" + topVer + ']');
-
- lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), null);
- }
- else if (log.isDebugEnabled())
- log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
- ", topVer=" + topVer + ']');
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
deleted file mode 100644
index 9fe4adc..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.mxbean.*;
-import org.apache.ignite.spi.*;
-
-import java.util.*;
-
-/**
- * Management bean for {@link TcpClientDiscoverySpi}.
- */
-public interface TcpClientDiscoverySpiMBean extends IgniteSpiManagementMBean {
- /**
- * Gets disconnect check interval.
- *
- * @return Disconnect check interval.
- */
- @MXBeanDescription("Disconnect check interval.")
- public long getDisconnectCheckInterval();
-
- /**
- * Gets socket timeout.
- *
- * @return Socket timeout.
- */
- @MXBeanDescription("Socket timeout.")
- public long getSocketTimeout();
-
- /**
- * Gets message acknowledgement timeout.
- *
- * @return Message acknowledgement timeout.
- */
- @MXBeanDescription("Message acknowledgement timeout.")
- public long getAckTimeout();
-
- /**
- * Gets network timeout.
- *
- * @return Network timeout.
- */
- @MXBeanDescription("Network timeout.")
- public long getNetworkTimeout();
-
- /**
- * Gets thread priority. All threads within SPI will be started with it.
- *
- * @return Thread priority.
- */
- @MXBeanDescription("Threads priority.")
- public int getThreadPriority();
-
- /**
- * Gets delay between heartbeat messages sent by coordinator.
- *
- * @return Time period in milliseconds.
- */
- @MXBeanDescription("Heartbeat frequency.")
- public long getHeartbeatFrequency();
-
- /**
- * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation).
- *
- * @return IPFinder (string representation).
- */
- @MXBeanDescription("IP Finder.")
- public String getIpFinderFormatted();
-
- /**
- * Gets message worker queue current size.
- *
- * @return Message worker queue current size.
- */
- @MXBeanDescription("Message worker queue current size.")
- public int getMessageWorkerQueueSize();
-
- /**
- * Gets joined nodes count.
- *
- * @return Nodes joined count.
- */
- @MXBeanDescription("Nodes joined count.")
- public long getNodesJoined();
-
- /**
- * Gets left nodes count.
- *
- * @return Left nodes count.
- */
- @MXBeanDescription("Nodes left count.")
- public long getNodesLeft();
-
- /**
- * Gets failed nodes count.
- *
- * @return Failed nodes count.
- */
- @MXBeanDescription("Nodes failed count.")
- public long getNodesFailed();
-
- /**
- * Gets avg message processing time.
- *
- * @return Avg message processing time.
- */
- @MXBeanDescription("Avg message processing time.")
- public long getAvgMessageProcessingTime();
-
- /**
- * Gets max message processing time.
- *
- * @return Max message processing time.
- */
- @MXBeanDescription("Max message processing time.")
- public long getMaxMessageProcessingTime();
-
- /**
- * Gets total received messages count.
- *
- * @return Total received messages count.
- */
- @MXBeanDescription("Total received messages count.")
- public int getTotalReceivedMessages();
-
- /**
- * Gets received messages counts (grouped by type).
- *
- * @return Map containing message types and respective counts.
- */
- @MXBeanDescription("Received messages by type.")
- public Map<String, Integer> getReceivedMessages();
-
- /**
- * Gets total processed messages count.
- *
- * @return Total processed messages count.
- */
- @MXBeanDescription("Total processed messages count.")
- public int getTotalProcessedMessages();
-
- /**
- * Gets processed messages counts (grouped by type).
- *
- * @return Map containing message types and respective counts.
- */
- @MXBeanDescription("Received messages by type.")
- public Map<String, Integer> getProcessedMessages();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
new file mode 100644
index 0000000..b7e9e53
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+abstract class TcpDiscoveryImpl {
+ /** Response OK. */
+ protected static final int RES_OK = 1;
+
+ /** Response CONTINUE JOIN. */
+ protected static final int RES_CONTINUE_JOIN = 100;
+
+ /** Response WAIT. */
+ protected static final int RES_WAIT = 200;
+
+ /** */
+ protected final TcpDiscoverySpi spi;
+
+ /** */
+ protected final IgniteLogger log;
+
+ /** */
+ protected TcpDiscoveryNode locNode;
+
+ /**
+ * @param spi Adapter.
+ */
+ TcpDiscoveryImpl(TcpDiscoverySpi spi) {
+ this.spi = spi;
+
+ log = spi.log;
+ }
+
+ /**
+ * @return Local node ID.
+ */
+ public UUID getLocalNodeId() {
+ return spi.getLocalNodeId();
+ }
+
+ /**
+ * @param msg Error message.
+ * @param e Exception.
+ */
+ protected void onException(String msg, Exception e){
+ spi.getExceptionRegistry().onException(msg, e);
+ }
+
+ /**
+ * @param log Logger.
+ */
+ public abstract void dumpDebugInfo(IgniteLogger log);
+
+ /**
+ * @return SPI state string.
+ */
+ public abstract String getSpiState();
+
+ /**
+ * @return Message worker queue current size.
+ */
+ public abstract int getMessageWorkerQueueSize();
+
+ /**
+ * @return Coordinator ID.
+ */
+ public abstract UUID getCoordinator();
+
+ /**
+ * @return Collection of remote nodes.
+ */
+ public abstract Collection<ClusterNode> getRemoteNodes();
+
+ /**
+ * @param nodeId Node id.
+ * @return Node with given ID or {@code null} if node is not found.
+ */
+ @Nullable public abstract ClusterNode getNode(UUID nodeId);
+
+ /**
+ * @param nodeId Node id.
+ * @return {@code true} if node alive, {@code false} otherwise.
+ */
+ public abstract boolean pingNode(UUID nodeId);
+
+ /**
+ * Tells discovery SPI to disconnect from topology.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public abstract void disconnect() throws IgniteSpiException;
+
+ /**
+ * @param msg Message.
+ * @throws IgniteException If failed.
+ */
+ public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;
+
+ /**
+ * @param nodeId Node id.
+ */
+ public abstract void failNode(UUID nodeId);
+
+ /**
+ * @param gridName Grid name.
+ * @throws IgniteSpiException If failed.
+ */
+ public abstract void spiStart(@Nullable String gridName) throws IgniteSpiException;
+
+ /**
+ * @throws IgniteSpiException If failed.
+ */
+ public abstract void spiStop() throws IgniteSpiException;
+
+ /**
+ * @param spiCtx Spi context.
+ * @throws IgniteSpiException If failed.
+ */
+ public abstract void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException;
+
+ /**
+ * @param t Thread.
+ * @return Status as string.
+ */
+ protected static String threadStatus(Thread t) {
+ if (t == null)
+ return "N/A";
+
+ return t.isAlive() ? "alive" : "dead";
+ }
+
+ /**
+ * <strong>FOR TEST ONLY!!!</strong>
+ * <p>
+ * Simulates this node failure by stopping service threads. So, node will become
+ * unresponsive.
+ * <p>
+ * This method is intended for test purposes only.
+ */
+ abstract void simulateNodeFailure();
+
+ /**
+ * FOR TEST PURPOSE ONLY!
+ */
+ public abstract void brakeConnection();
+
+ /**
+ * <strong>FOR TEST ONLY!!!</strong>
+ *
+ * @return Worker thread.
+ */
+ protected abstract IgniteSpiThread workerThread();
+
+ /**
+ * @throws IgniteSpiException If failed.
+ */
+ @SuppressWarnings("BusyWait")
+ protected final void registerLocalNodeAddress() throws IgniteSpiException {
+ // Make sure address registration succeeded.
+ while (true) {
+ try {
+ spi.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+
+ // Success.
+ break;
+ }
+ catch (IllegalStateException e) {
+ throw new IgniteSpiException("Failed to register local node address with IP finder: " +
+ locNode.socketAddresses(), e);
+ }
+ catch (IgniteSpiException e) {
+ LT.error(log, e, "Failed to register local node address in IP finder on start " +
+ "(retrying every 2000 ms).");
+ }
+
+ try {
+ U.sleep(2000);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+ }
+ }
+}