You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/28 13:47:21 UTC
[2/6] incubator-ignite git commit: # IGNITE-943 Rename
TcpDiscoveryImpl.adapter to spi
# IGNITE-943 Rename TcpDiscoveryImpl.adapter to spi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0e192ef8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0e192ef8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0e192ef8
Branch: refs/heads/ignite-943
Commit: 0e192ef84def1cdfe121e1e132c0b9740341fb9c
Parents: 92b2a57
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 28 12:11:22 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 28 14:46:07 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 138 ++++----
.../ignite/spi/discovery/tcp/ServerImpl.java | 348 +++++++++----------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 14 +-
3 files changed, 250 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0e192ef8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index aa254ec..2171085 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -116,7 +116,7 @@ class ClientImpl extends TcpDiscoveryImpl {
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());
- b.append(" Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl());
+ b.append(" Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl());
b.append(U.nl());
@@ -127,7 +127,7 @@ class ClientImpl extends TcpDiscoveryImpl {
b.append(U.nl());
- b.append("Stats: ").append(adapter.stats).append(U.nl());
+ b.append("Stats: ").append(spi.stats).append(U.nl());
U.quietAndInfo(log, b.toString());
}
@@ -153,9 +153,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
- adapter.initLocalNode(0, true);
+ spi.initLocalNode(0, true);
- locNode = adapter.locNode;
+ locNode = spi.locNode;
sockWriter = new SocketWriter();
sockWriter.start();
@@ -176,9 +176,9 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
- timer.schedule(new HeartbeatSender(), adapter.hbFreq, adapter.hbFreq);
+ timer.schedule(new HeartbeatSender(), spi.hbFreq, spi.hbFreq);
- adapter.printStartInfo();
+ spi.printStartInfo();
}
/** {@inheritDoc} */
@@ -189,7 +189,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(SPI_STOP);
try {
- if (!leaveLatch.await(adapter.netTimeout, MILLISECONDS))
+ if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
}
catch (InterruptedException ignored) {
@@ -210,7 +210,7 @@ class ClientImpl extends TcpDiscoveryImpl {
U.join(sockWriter, log);
U.join(sockReader, log);
- adapter.printStopInfo();
+ spi.printStopInfo();
}
/** {@inheritDoc} */
@@ -253,7 +253,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (oldFut != null)
fut = oldFut;
else {
- if (adapter.getSpiContext().isStopping()) {
+ if (spi.getSpiContext().isStopping()) {
if (pingFuts.remove(nodeId, fut))
fut.onDone(false);
@@ -267,7 +267,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (pingFuts.remove(nodeId, finalFut))
finalFut.onDone(false);
}
- }, adapter.netTimeout);
+ }, spi.netTimeout);
sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
}
@@ -297,13 +297,13 @@ class ClientImpl extends TcpDiscoveryImpl {
leaveLatch.countDown();
joinLatch.countDown();
- adapter.getSpiContext().deregisterPorts();
+ spi.getSpiContext().deregisterPorts();
Collection<ClusterNode> rmts = getRemoteNodes();
// This is restart/disconnection and remote nodes are not empty.
// We need to fire FAIL event for each.
- DiscoverySpiListener lsnr = adapter.lsnr;
+ DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
for (ClusterNode n : rmts) {
@@ -330,7 +330,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
- adapter.marsh.marshal(evt)));
+ spi.marsh.marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -364,16 +364,16 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new InterruptedException();
while (addrs == null || addrs.isEmpty()) {
- addrs = adapter.resolvedAddresses();
+ addrs = spi.resolvedAddresses();
if (!F.isEmpty(addrs)) {
if (log.isDebugEnabled())
log.debug("Resolved addresses from IP finder: " + addrs);
}
else {
- U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + adapter.ipFinder);
+ U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder);
- if (adapter.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > adapter.joinTimeout)
+ if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
return null;
Thread.sleep(2000);
@@ -401,20 +401,20 @@ class ClientImpl extends TcpDiscoveryImpl {
UUID rmtNodeId = t.get2();
- adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
+ spi.stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
locNode.clientRouterNodeId(rmtNodeId);
TcpDiscoveryAbstractMessage msg = recon ?
new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
lastMsgId) :
- new TcpDiscoveryJoinRequestMessage(locNode, adapter.collectExchangeData(getLocalNodeId()));
+ new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
msg.client(true);
- adapter.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg);
- int res = adapter.readReceipt(sock, adapter.ackTimeout);
+ int res = spi.readReceipt(sock, spi.ackTimeout);
switch (res) {
case RES_OK:
@@ -447,7 +447,7 @@ class ClientImpl extends TcpDiscoveryImpl {
U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
"in 2000ms): " + addrs0);
- if (adapter.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > adapter.joinTimeout)
+ if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
return null;
Thread.sleep(2000);
@@ -470,11 +470,11 @@ class ClientImpl extends TcpDiscoveryImpl {
topHist.put(topVer, allNodes);
- if (topHist.size() > adapter.topHistSize)
+ if (topHist.size() > spi.topHistSize)
topHist.pollFirstEntry();
assert topHist.lastKey() == topVer;
- assert topHist.size() <= adapter.topHistSize;
+ assert topHist.size() <= spi.topHistSize;
}
return allNodes;
@@ -505,15 +505,15 @@ class ClientImpl extends TcpDiscoveryImpl {
private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException {
assert addr != null;
- Socket sock = adapter.openSocket(addr);
+ Socket sock = spi.openSocket(addr);
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
req.client(true);
- adapter.writeToSocket(sock, req);
+ spi.writeToSocket(sock, req);
- TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, adapter.ackTimeout);
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, spi.ackTimeout);
UUID nodeId = res.creatorNodeId();
@@ -529,11 +529,11 @@ class ClientImpl extends TcpDiscoveryImpl {
U.interrupt(sockWriter);
U.interrupt(msgWorker);
- U.interrupt(adapter.sockTimeoutWorker);
+ U.interrupt(spi.sockTimeoutWorker);
U.join(sockWriter, log);
U.join(msgWorker, log);
- U.join(adapter.sockTimeoutWorker, log);
+ U.join(spi.sockTimeoutWorker, log);
}
/** {@inheritDoc} */
@@ -569,9 +569,9 @@ class ClientImpl extends TcpDiscoveryImpl {
private class HeartbeatSender extends TimerTask {
/** {@inheritDoc} */
@Override public void run() {
- if (!adapter.getSpiContext().isStopping() && sockWriter.isOnline()) {
+ if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) {
TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(),
- adapter.metricsProvider.metrics());
+ spi.metricsProvider.metrics());
msg.client(true);
@@ -596,7 +596,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
*/
protected SocketReader() {
- super(adapter.ignite().name(), "tcp-client-disco-sock-reader", log);
+ super(spi.ignite().name(), "tcp-client-disco-sock-reader", log);
}
/**
@@ -640,7 +640,7 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryAbstractMessage msg;
try {
- msg = adapter.marsh.unmarshal(in, U.gridClassLoader());
+ msg = spi.marsh.unmarshal(in, U.gridClassLoader());
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -670,9 +670,9 @@ class ClientImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Message has been received: " + msg);
- adapter.stats.onMessageReceived(msg);
+ spi.stats.onMessageReceived(msg);
- if (adapter.ensured(msg))
+ if (spi.ensured(msg))
lastMsgId = msg.id();
msgWorker.addMessage(msg);
@@ -715,7 +715,7 @@ class ClientImpl extends TcpDiscoveryImpl {
*
*/
protected SocketWriter() {
- super(adapter.ignite().name(), "tcp-client-disco-sock-writer", log);
+ super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
}
/**
@@ -775,11 +775,11 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs)
+ for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
msgLsnr.apply(msg);
try {
- adapter.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg);
msg = null;
}
@@ -814,7 +814,7 @@ class ClientImpl extends TcpDiscoveryImpl {
*
*/
protected Reconnector() {
- super(adapter.ignite().name(), "tcp-client-disco-msg-worker", log);
+ super(spi.ignite().name(), "tcp-client-disco-msg-worker", log);
}
/**
@@ -851,7 +851,7 @@ class ClientImpl extends TcpDiscoveryImpl {
// Wait for
while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = adapter.marsh.unmarshal(in, U.gridClassLoader());
+ TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
@@ -902,13 +902,13 @@ class ClientImpl extends TcpDiscoveryImpl {
*
*/
private MessageWorker() {
- super(adapter.ignite().name(), "tcp-client-disco-msg-worker", log);
+ super(spi.ignite().name(), "tcp-client-disco-msg-worker", log);
}
/** {@inheritDoc} */
@SuppressWarnings("InfiniteLoopStatement")
@Override protected void body() throws InterruptedException {
- adapter.stats.onJoinStarted();
+ spi.stats.onJoinStarted();
try {
final Socket sock = joinTopology(false);
@@ -930,7 +930,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (joinLatch.getCount() > 0)
queue.add(JOIN_TIMEOUT);
}
- }, adapter.netTimeout);
+ }, spi.netTimeout);
sockReader.setSocket(sock, locNode.clientRouterNodeId());
@@ -940,7 +940,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (msg == JOIN_TIMEOUT) {
if (joinLatch.getCount() > 0) {
joinErr = new IgniteSpiException("Join process timed out [sock=" + sock +
- ", timeout=" + adapter.netTimeout + ']');
+ ", timeout=" + spi.netTimeout + ']');
joinLatch.countDown();
@@ -948,7 +948,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
else if (msg == SPI_STOP) {
- assert adapter.getSpiContext().isStopping();
+ assert spi.getSpiContext().isStopping();
if (currSock != null) {
TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
@@ -972,7 +972,7 @@ class ClientImpl extends TcpDiscoveryImpl {
break;
}
else {
- if (adapter.getSpiContext().isStopping() || segmented)
+ if (spi.getSpiContext().isStopping() || segmented)
leaveLatch.countDown();
else {
assert reconnector == null;
@@ -986,7 +986,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (reconnector.isAlive())
reconnector.cancel();
}
- }, adapter.netTimeout);
+ }, spi.netTimeout);
}
}
}
@@ -1008,11 +1008,11 @@ class ClientImpl extends TcpDiscoveryImpl {
IgniteSpiException err = null;
if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
- err = adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+ err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage)
- err = adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+ err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage)
- err = adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+ err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
if (err != null) {
joinErr = err;
@@ -1052,7 +1052,7 @@ class ClientImpl extends TcpDiscoveryImpl {
assert msg != null;
assert msg.verified() || msg.senderNodeId() == null;
- adapter.stats.onMessageProcessingStarted(msg);
+ spi.stats.onMessageProcessingStarted(msg);
if (msg instanceof TcpDiscoveryNodeAddedMessage)
processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
@@ -1073,14 +1073,14 @@ class ClientImpl extends TcpDiscoveryImpl {
else if (msg instanceof TcpDiscoveryPingRequest)
processPingRequest();
- adapter.stats.onMessageProcessingFinished(msg);
+ spi.stats.onMessageProcessingFinished(msg);
}
/**
* @param msg Message.
*/
private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
- if (adapter.getSpiContext().isStopping())
+ if (spi.getSpiContext().isStopping())
return;
TcpDiscoveryNode node = msg.node();
@@ -1092,7 +1092,7 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<TcpDiscoveryNode> top = msg.topology();
if (top != null) {
- adapter.gridStartTime = msg.gridStartTime();
+ spi.gridStartTime = msg.gridStartTime();
for (TcpDiscoveryNode n : top) {
if (n.order() > 0)
@@ -1123,7 +1123,7 @@ class ClientImpl extends TcpDiscoveryImpl {
Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
if (data != null)
- adapter.onExchange(newNodeId, newNodeId, data, null);
+ spi.onExchange(newNodeId, newNodeId, data, null);
}
}
}
@@ -1132,7 +1132,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
- if (adapter.getSpiContext().isStopping())
+ if (spi.getSpiContext().isStopping())
return;
if (getLocalNodeId().equals(msg.nodeId())) {
@@ -1141,7 +1141,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (dataMap != null) {
for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- adapter.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null);
+ spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null);
}
locNode.setAttributes(msg.clientNodeAttributes());
@@ -1157,7 +1157,7 @@ class ClientImpl extends TcpDiscoveryImpl {
joinLatch.countDown();
- adapter.stats.onJoinFinished();
+ spi.stats.onJoinFinished();
}
else if (log.isDebugEnabled())
log.debug("Discarding node add finished message (this message has already been processed) " +
@@ -1178,8 +1178,8 @@ class ClientImpl extends TcpDiscoveryImpl {
node.order(topVer);
node.visible(true);
- if (adapter.locNodeVer.equals(node.version()))
- node.version(adapter.locNodeVer);
+ if (spi.locNodeVer.equals(node.version()))
+ node.version(spi.locNodeVer);
NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
@@ -1192,7 +1192,7 @@ class ClientImpl extends TcpDiscoveryImpl {
notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
- adapter.stats.onNodeJoined();
+ spi.stats.onNodeJoined();
}
}
@@ -1207,7 +1207,7 @@ class ClientImpl extends TcpDiscoveryImpl {
leaveLatch.countDown();
}
else {
- if (adapter.getSpiContext().isStopping())
+ if (spi.getSpiContext().isStopping())
return;
TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
@@ -1230,7 +1230,7 @@ class ClientImpl extends TcpDiscoveryImpl {
notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
- adapter.stats.onNodeLeft();
+ spi.stats.onNodeLeft();
}
}
@@ -1238,7 +1238,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
- if (adapter.getSpiContext().isStopping()) {
+ if (spi.getSpiContext().isStopping()) {
if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
if (leaveLatch.getCount() > 0) {
log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
@@ -1272,7 +1272,7 @@ class ClientImpl extends TcpDiscoveryImpl {
notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
- adapter.stats.onNodeFailed();
+ spi.stats.onNodeFailed();
}
}
@@ -1280,7 +1280,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
- if (adapter.getSpiContext().isStopping())
+ if (spi.getSpiContext().isStopping())
return;
if (getLocalNodeId().equals(msg.creatorNodeId())) {
@@ -1314,7 +1314,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
- if (adapter.getSpiContext().isStopping())
+ if (spi.getSpiContext().isStopping())
return;
if (getLocalNodeId().equals(msg.creatorNodeId())) {
@@ -1346,7 +1346,7 @@ class ClientImpl extends TcpDiscoveryImpl {
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (msg.verified() && joinLatch.getCount() == 0) {
- DiscoverySpiListener lsnr = adapter.lsnr;
+ DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
UUID nodeId = msg.creatorNodeId();
@@ -1355,7 +1355,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (node != null && node.visible()) {
try {
- DiscoverySpiCustomMessage msgObj = msg.message(adapter.marsh);
+ DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh);
notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
}
@@ -1437,7 +1437,7 @@ class ClientImpl extends TcpDiscoveryImpl {
*/
private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top,
@Nullable DiscoverySpiCustomMessage data) {
- DiscoverySpiListener lsnr = adapter.lsnr;
+ DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0e192ef8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 184895b..a966363 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -230,36 +230,36 @@ class ServerImpl extends TcpDiscoveryImpl {
tcpSrvr = new TcpServer();
- adapter.initLocalNode(tcpSrvr.port, true);
+ spi.initLocalNode(tcpSrvr.port, true);
- locNode = adapter.locNode;
+ locNode = spi.locNode;
// Start TCP server thread after local node is initialized.
tcpSrvr.start();
ring.localNode(locNode);
- if (adapter.ipFinder.isShared())
+ if (spi.ipFinder.isShared())
registerLocalNodeAddress();
else {
- if (F.isEmpty(adapter.ipFinder.getRegisteredAddresses()))
+ if (F.isEmpty(spi.ipFinder.getRegisteredAddresses()))
throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " +
"GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " +
"(specify list of IP addresses in configuration).");
- ipFinderHasLocAddr = adapter.ipFinderHasLocalAddress();
+ ipFinderHasLocAddr = spi.ipFinderHasLocalAddress();
}
- if (adapter.getStatisticsPrintFrequency() > 0 && log.isInfoEnabled()) {
+ if (spi.getStatisticsPrintFrequency() > 0 && log.isInfoEnabled()) {
statsPrinter = new StatisticsPrinter();
statsPrinter.start();
}
- adapter.stats.onJoinStarted();
+ spi.stats.onJoinStarted();
joinTopology();
- adapter.stats.onJoinFinished();
+ spi.stats.onJoinFinished();
hbsSnd = new HeartbeatsSender();
hbsSnd.start();
@@ -267,12 +267,12 @@ class ServerImpl extends TcpDiscoveryImpl {
chkStatusSnd = new CheckStatusSender();
chkStatusSnd.start();
- if (adapter.ipFinder.isShared()) {
+ if (spi.ipFinder.isShared()) {
ipFinderCleaner = new IpFinderCleaner();
ipFinderCleaner.start();
}
- adapter.printStartInfo();
+ spi.printStartInfo();
}
/**
@@ -283,7 +283,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Make sure address registration succeeded.
while (true) {
try {
- adapter.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+ spi.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
// Success.
break;
@@ -341,9 +341,9 @@ class ServerImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
synchronized (mux) {
- long threshold = U.currentTimeMillis() + adapter.netTimeout;
+ long threshold = U.currentTimeMillis() + spi.netTimeout;
- long timeout = adapter.netTimeout;
+ long timeout = spi.netTimeout;
while (spiState != LEFT && timeout > 0) {
try {
@@ -400,9 +400,9 @@ class ServerImpl extends TcpDiscoveryImpl {
Collection<TcpDiscoveryNode> rmts = null;
if (!disconnect)
- adapter.printStopInfo();
+ spi.printStopInfo();
else {
- adapter.getSpiContext().deregisterPorts();
+ spi.getSpiContext().deregisterPorts();
rmts = ring.visibleRemoteNodes();
}
@@ -414,7 +414,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (rmts != null && !rmts.isEmpty()) {
// This is restart/disconnection and remote nodes are not empty.
// We need to fire FAIL event for each.
- DiscoverySpiListener lsnr = adapter.lsnr;
+ DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
Collection<ClusterNode> processed = new HashSet<>();
@@ -438,7 +438,7 @@ class ServerImpl extends TcpDiscoveryImpl {
printStatistics();
- adapter.stats.clear();
+ spi.stats.clear();
synchronized (mux) {
// Clear stored data.
@@ -498,7 +498,7 @@ class ServerImpl extends TcpDiscoveryImpl {
return false;
}
- for (InetSocketAddress addr : adapter.getNodeAddresses(node, U.sameMacs(locNode, node))) {
+ for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
try {
// ID returned by the node should be the same as ID of the parameter for ping to succeed.
IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
@@ -530,7 +530,7 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
- if (F.contains(adapter.locNodeAddrs, addr)) {
+ if (F.contains(spi.locNodeAddrs, addr)) {
if (clientNodeId == null)
return F.t(getLocalNodeId(), false);
@@ -565,18 +565,18 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
Socket sock = null;
- for (int i = 0; i < adapter.reconCnt; i++) {
+ for (int i = 0; i < spi.reconCnt; i++) {
try {
if (addr.isUnresolved())
addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
long tstamp = U.currentTimeMillis();
- sock = adapter.openSocket(addr);
+ sock = spi.openSocket(addr);
- adapter.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+ spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
- TcpDiscoveryPingResponse res = adapter.readMessage(sock, null, adapter.netTimeout);
+ TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout);
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -585,7 +585,7 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
- adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+ spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
@@ -638,7 +638,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
try {
- msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, adapter.marsh.marshal(evt)));
+ msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, spi.marsh.marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -691,7 +691,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<String, Object> attrs = new HashMap<>(locNode.attributes());
attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT,
- adapter.ignite().configuration().getMarshaller().marshal(subj));
+ spi.ignite().configuration().getMarshaller().marshal(subj));
attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
locNode.setAttributes(attrs);
@@ -704,7 +704,7 @@ class ServerImpl extends TcpDiscoveryImpl {
locNode.order(1);
locNode.internalOrder(1);
- adapter.gridStartTime = U.currentTimeMillis();
+ spi.gridStartTime = U.currentTimeMillis();
locNode.visible(true);
@@ -729,9 +729,9 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Join request message has been sent (waiting for coordinator response).");
synchronized (mux) {
- long threshold = U.currentTimeMillis() + adapter.netTimeout;
+ long threshold = U.currentTimeMillis() + spi.netTimeout;
- long timeout = adapter.netTimeout;
+ long timeout = spi.netTimeout;
while (spiState == CONNECTING && timeout > 0) {
try {
@@ -749,15 +749,15 @@ class ServerImpl extends TcpDiscoveryImpl {
if (spiState == CONNECTED)
break;
else if (spiState == DUPLICATE_ID)
- throw adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
+ throw spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
else if (spiState == AUTH_FAILED)
- throw adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
+ throw spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
else if (spiState == CHECK_FAILED)
- throw adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
+ throw spi.checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
else if (spiState == LOOPBACK_PROBLEM) {
TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get();
- boolean locHostLoopback = adapter.locHost.isLoopbackAddress();
+ boolean locHostLoopback = spi.locHost.isLoopbackAddress();
String firstNode = locHostLoopback ? "local" : "remote";
@@ -774,7 +774,7 @@ class ServerImpl extends TcpDiscoveryImpl {
"Check remote nodes logs for possible error messages. " +
"Note that large topology may require significant time to start. " +
"Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
- "if getting this message on the starting nodes [networkTimeout=" + adapter.netTimeout + ']');
+ "if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']');
}
}
@@ -796,13 +796,13 @@ class ServerImpl extends TcpDiscoveryImpl {
@SuppressWarnings({"BusyWait"})
private boolean sendJoinRequestMessage() throws IgniteSpiException {
TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
- adapter.collectExchangeData(getLocalNodeId()));
+ spi.collectExchangeData(getLocalNodeId()));
// Time when it has been detected, that addresses from IP finder do not respond.
long noResStart = 0;
while (true) {
- Collection<InetSocketAddress> addrs = adapter.resolvedAddresses();
+ Collection<InetSocketAddress> addrs = spi.resolvedAddresses();
if (F.isEmpty(addrs))
return false;
@@ -810,7 +810,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean retry = false;
Collection<Exception> errs = new ArrayList<>();
- try (SocketMultiConnector multiConnector = new SocketMultiConnector(adapter, addrs, 2)) {
+ try (SocketMultiConnector multiConnector = new SocketMultiConnector(spi, addrs, 2)) {
GridTuple3<InetSocketAddress, Socket, Exception> tuple;
while ((tuple = multiConnector.next()) != null) {
@@ -897,7 +897,7 @@ class ServerImpl extends TcpDiscoveryImpl {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
}
- else if (!adapter.ipFinder.isShared() && !ipFinderHasLocAddr) {
+ else if (!spi.ipFinder.isShared() && !ipFinderHasLocAddr) {
IgniteCheckedException e = null;
if (!errs.isEmpty()) {
@@ -912,10 +912,10 @@ class ServerImpl extends TcpDiscoveryImpl {
"(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
addrs);
- if (adapter.joinTimeout > 0) {
+ if (spi.joinTimeout > 0) {
if (noResStart == 0)
noResStart = U.currentTimeMillis();
- else if (U.currentTimeMillis() - noResStart > adapter.joinTimeout)
+ else if (U.currentTimeMillis() - noResStart > spi.joinTimeout)
throw new IgniteSpiException(
"Failed to connect to any address from IP finder within join timeout " +
"(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
@@ -952,7 +952,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Collection<Throwable> errs = null;
- long ackTimeout0 = adapter.ackTimeout;
+ long ackTimeout0 = spi.ackTimeout;
int connectAttempts = 1;
@@ -960,7 +960,7 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
- for (int i = 0; i < adapter.reconCnt; i++) {
+ for (int i = 0; i < spi.reconCnt; i++) {
// Need to set to false on each new iteration,
// since remote node may leave in the middle of the first iteration.
joinReqSent = false;
@@ -971,14 +971,14 @@ class ServerImpl extends TcpDiscoveryImpl {
long tstamp = U.currentTimeMillis();
if (sock == null)
- sock = adapter.openSocket(addr);
+ sock = spi.openSocket(addr);
openSock = true;
// Handshake.
- adapter.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+ spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
- TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0);
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -987,14 +987,14 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
- adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+ spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
// Send message.
tstamp = U.currentTimeMillis();
- adapter.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg);
- adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+ spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
if (debugMode)
debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
@@ -1009,7 +1009,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// E.g. due to class not found issue.
joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
- return adapter.readReceipt(sock, ackTimeout0);
+ return spi.readReceipt(sock, ackTimeout0);
}
catch (ClassCastException e) {
// This issue is rarely reproducible on AmazonEC2, but never
@@ -1087,7 +1087,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
- adapter.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+ spi.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
node.setAttributes(attrs);
}
@@ -1110,7 +1110,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (credBytes == null)
return null;
- return adapter.marsh.unmarshal(credBytes, null);
+ return spi.marsh.unmarshal(credBytes, null);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
@@ -1123,10 +1123,10 @@ class ServerImpl extends TcpDiscoveryImpl {
* maximum acknowledgement timeout, {@code false} otherwise.
*/
private boolean checkAckTimeout(long ackTimeout) {
- if (ackTimeout > adapter.maxAckTimeout) {
+ if (ackTimeout > spi.maxAckTimeout) {
LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
"(consider increasing 'maxAckTimeout' configuration property) " +
- "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + adapter.maxAckTimeout + ']');
+ "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
return false;
}
@@ -1145,7 +1145,7 @@ class ServerImpl extends TcpDiscoveryImpl {
assert type > 0;
assert node != null;
- DiscoverySpiListener lsnr = adapter.lsnr;
+ DiscoverySpiListener lsnr = spi.lsnr;
TcpDiscoverySpiState spiState = spiStateCopy();
@@ -1179,7 +1179,7 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.put(topVer, top);
- while (topHist.size() > adapter.topHistSize)
+ while (topHist.size() > spi.topHistSize)
topHist.remove(topHist.firstKey());
if (log.isDebugEnabled())
@@ -1200,7 +1200,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator());
if (crd)
- adapter.stats.onBecomingCoordinator();
+ spi.stats.onBecomingCoordinator();
return crd;
}
@@ -1254,7 +1254,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* Prints SPI statistics.
*/
private void printStatistics() {
- if (log.isInfoEnabled() && adapter.statsPrintFreq > 0) {
+ if (log.isInfoEnabled() && spi.statsPrintFreq > 0) {
int failedNodesSize;
int leavingNodesSize;
@@ -1267,7 +1267,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryNode coord = resolveCoordinator();
- log.info("Discovery SPI statistics [statistics=" + adapter.stats + ", spiState=" + spiStateCopy() +
+ log.info("Discovery SPI statistics [statistics=" + spi.stats + ", spiState=" + spiStateCopy() +
", coord=" + coord +
", topSize=" + ring.allNodes().size() +
", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize +
@@ -1439,7 +1439,7 @@ class ServerImpl extends TcpDiscoveryImpl {
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
- b.append(" Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl());
+ b.append(" Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl());
b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
@@ -1473,7 +1473,7 @@ class ServerImpl extends TcpDiscoveryImpl {
b.append(U.nl());
- b.append("Stats: ").append(adapter.stats).append(U.nl());
+ b.append("Stats: ").append(spi.stats).append(U.nl());
U.quietAndInfo(log, b.toString());
}
@@ -1548,9 +1548,9 @@ class ServerImpl extends TcpDiscoveryImpl {
* Constructor.
*/
private HeartbeatsSender() {
- super(adapter.ignite().name(), "tcp-disco-hb-sender", log);
+ super(spi.ignite().name(), "tcp-disco-hb-sender", log);
- setPriority(adapter.threadPri);
+ setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@@ -1576,7 +1576,7 @@ class ServerImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(msg);
- Thread.sleep(adapter.hbFreq);
+ Thread.sleep(spi.hbFreq);
}
}
}
@@ -1592,9 +1592,9 @@ class ServerImpl extends TcpDiscoveryImpl {
* Constructor.
*/
private CheckStatusSender() {
- super(adapter.ignite().name(), "tcp-disco-status-check-sender", log);
+ super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
- setPriority(adapter.threadPri);
+ setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@@ -1604,7 +1604,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Status check sender has been started.");
// Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
- long checkTimeout = (long)adapter.maxMissedHbs * adapter.hbFreq + 50;
+ long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
long lastSent = 0;
@@ -1656,9 +1656,9 @@ class ServerImpl extends TcpDiscoveryImpl {
* Constructor.
*/
private IpFinderCleaner() {
- super(adapter.ignite().name(), "tcp-disco-ip-finder-cleaner", log);
+ super(spi.ignite().name(), "tcp-disco-ip-finder-cleaner", log);
- setPriority(adapter.threadPri);
+ setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@@ -1668,7 +1668,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("IP finder cleaner has been started.");
while (!isInterrupted()) {
- Thread.sleep(adapter.ipFinderCleanFreq);
+ Thread.sleep(spi.ipFinderCleanFreq);
if (!isLocalNodeCoordinator())
continue;
@@ -1680,7 +1680,7 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- if (adapter.ipFinder.isShared())
+ if (spi.ipFinder.isShared())
cleanIpFinder();
}
}
@@ -1689,7 +1689,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* Cleans IP finder.
*/
private void cleanIpFinder() {
- assert adapter.ipFinder.isShared();
+ assert spi.ipFinder.isShared();
try {
// Addresses that belongs to nodes in topology.
@@ -1698,7 +1698,7 @@ class ServerImpl extends TcpDiscoveryImpl {
ring.allNodes(),
new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() {
@Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) {
- return !node.isClient() ? adapter.getNodeAddresses(node) :
+ return !node.isClient() ? spi.getNodeAddresses(node) :
Collections.<InetSocketAddress>emptyList();
}
}
@@ -1706,7 +1706,7 @@ class ServerImpl extends TcpDiscoveryImpl {
);
// Addresses registered in IP finder.
- Collection<InetSocketAddress> regAddrs = adapter.registeredAddresses();
+ Collection<InetSocketAddress> regAddrs = spi.registeredAddresses();
// Remove all addresses that belong to alive nodes, leave dead-node addresses.
Collection<InetSocketAddress> rmvAddrs = F.view(
@@ -1742,7 +1742,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Unregister dead-nodes addresses.
if (!rmvAddrs.isEmpty()) {
- adapter.ipFinder.unregisterAddresses(rmvAddrs);
+ spi.ipFinder.unregisterAddresses(rmvAddrs);
if (log.isDebugEnabled())
log.debug("Unregistered addresses from IP finder: " + rmvAddrs);
@@ -1756,7 +1756,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Re-register missing addresses.
if (!missingAddrs.isEmpty()) {
- adapter.ipFinder.registerAddresses(missingAddrs);
+ spi.ipFinder.registerAddresses(missingAddrs);
if (log.isDebugEnabled())
log.debug("Registered missing addresses in IP finder: " + missingAddrs);
@@ -1897,7 +1897,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
- adapter.stats.onMessageProcessingStarted(msg);
+ spi.stats.onMessageProcessingStarted(msg);
if (msg instanceof TcpDiscoveryJoinRequestMessage)
processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
@@ -1938,7 +1938,7 @@ class ServerImpl extends TcpDiscoveryImpl {
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
- adapter.stats.onMessageProcessingFinished(msg);
+ spi.stats.onMessageProcessingFinished(msg);
}
/**
@@ -1952,7 +1952,7 @@ class ServerImpl extends TcpDiscoveryImpl {
assert ring.hasRemoteNodes();
- for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs)
+ for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
msgLsnr.apply(msg);
if (redirectToClients(msg)) {
@@ -1964,9 +1964,9 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
if (marshalledMsg == null)
- marshalledMsg = adapter.marsh.marshal(msg);
+ marshalledMsg = spi.marsh.marshal(msg);
- msgClone = adapter.marsh.unmarshal(marshalledMsg, null);
+ msgClone = spi.marsh.unmarshal(marshalledMsg, null);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal message: " + msg, e);
@@ -2043,8 +2043,8 @@ class ServerImpl extends TcpDiscoveryImpl {
List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
- addr: for (InetSocketAddress addr : adapter.getNodeAddresses(next, sameHost)) {
- long ackTimeout0 = adapter.ackTimeout;
+ addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) {
+ long ackTimeout0 = spi.ackTimeout;
if (locNodeAddrs.contains(addr)){
if (log.isDebugEnabled())
@@ -2054,7 +2054,7 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
- for (int i = 0; i < adapter.reconCnt; i++) {
+ for (int i = 0; i < spi.reconCnt; i++) {
if (sock == null) {
nextNodeExists = false;
@@ -2066,14 +2066,14 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = adapter.openSocket(addr);
+ sock = spi.openSocket(addr);
openSock = true;
// Handshake.
writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
- TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0);
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -2086,7 +2086,7 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
- adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+ spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
UUID nextId = res.creatorNodeId();
@@ -2214,9 +2214,9 @@ class ServerImpl extends TcpDiscoveryImpl {
clearNodeAddedMessage(pendingMsg);
}
- adapter.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
+ spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
- int res = adapter.readReceipt(sock, ackTimeout0);
+ int res = spi.readReceipt(sock, ackTimeout0);
if (log.isDebugEnabled())
log.debug("Pending message has been sent to next node [msg=" + msg.id() +
@@ -2237,9 +2237,9 @@ class ServerImpl extends TcpDiscoveryImpl {
writeToSocket(sock, msg);
- adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+ spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
- int res = adapter.readReceipt(sock, ackTimeout0);
+ int res = spi.readReceipt(sock, ackTimeout0);
if (log.isDebugEnabled())
log.debug("Message has been sent to next node [msg=" + msg +
@@ -2382,10 +2382,10 @@ class ServerImpl extends TcpDiscoveryImpl {
private void registerPendingMessage(TcpDiscoveryAbstractMessage msg) {
assert msg != null;
- if (adapter.ensured(msg)) {
+ if (spi.ensured(msg)) {
pendingMsgs.add(msg);
- adapter.stats.onPendingMessageRegistered();
+ spi.stats.onPendingMessageRegistered();
if (log.isDebugEnabled())
log.debug("Pending message has been registered: " + msg.id());
@@ -2411,7 +2411,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// This check is performed by the node joining node is connected to, but not by coordinator
// because loopback problem message is sent directly to the joining node which may be unavailable
// if coordinator resides on another host.
- if (adapter.locHost.isLoopbackAddress() != rmtHostLoopback) {
+ if (spi.locHost.isLoopbackAddress() != rmtHostLoopback) {
String firstNode = rmtHostLoopback ? "remote" : "local";
String secondNode = rmtHostLoopback ? "local" : "remote";
@@ -2510,7 +2510,7 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId,
- adapter.locHost));
+ spi.locHost));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
@@ -2541,7 +2541,7 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId,
- adapter.locHost));
+ spi.locHost));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
@@ -2557,7 +2557,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT,
- adapter.ignite().configuration().getMarshaller().marshal(subj));
+ spi.ignite().configuration().getMarshaller().marshal(subj));
node.setAttributes(attrs);
}
@@ -2578,7 +2578,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- IgniteNodeValidationResult err = adapter.getSpiContext().validateNode(node);
+ IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
if (err != null) {
boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
@@ -2666,7 +2666,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Internal order has been assigned to node: " + node);
TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
- node, msg.discoveryData(), adapter.gridStartTime);
+ node, msg.discoveryData(), spi.gridStartTime);
nodeAddedMsg.client(msg.client());
@@ -2711,7 +2711,7 @@ class ServerImpl extends TcpDiscoveryImpl {
IgniteSpiException ex = null;
- for (InetSocketAddress addr : adapter.getNodeAddresses(node, U.sameMacs(locNode, node))) {
+ for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
try {
sendMessageDirectly(msg, addr, null);
@@ -2760,7 +2760,7 @@ class ServerImpl extends TcpDiscoveryImpl {
assert node.isClient();
node.clientRouterNodeId(msg.routerNodeId());
- node.aliveCheck(adapter.maxMissedClientHbs);
+ node.aliveCheck(spi.maxMissedClientHbs);
if (isLocalNodeCoordinator()) {
Collection<TcpDiscoveryAbstractMessage> pending =
@@ -2826,7 +2826,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (isLocalNodeCoordinator()) {
if (msg.verified()) {
- adapter.stats.onRingMessageReceived(msg);
+ spi.stats.onRingMessageReceived(msg);
TcpDiscoveryNodeAddFinishedMessage addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(locNodeId,
node.id());
@@ -2882,7 +2882,7 @@ class ServerImpl extends TcpDiscoveryImpl {
else {
SecurityContext subj = nodeAuth.authenticateNode(node, cred);
- SecurityContext coordSubj = adapter.ignite().configuration().getMarshaller().unmarshal(
+ SecurityContext coordSubj = spi.ignite().configuration().getMarshaller().unmarshal(
node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT),
U.gridClassLoader());
@@ -2911,7 +2911,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (authFailed) {
try {
trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId,
- adapter.locHost));
+ spi.locHost));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
@@ -2929,7 +2929,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.client())
- node.aliveCheck(adapter.maxMissedClientHbs);
+ node.aliveCheck(spi.maxMissedClientHbs);
boolean topChanged = ring.add(node);
@@ -2939,9 +2939,9 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
if (data != null)
- adapter.onExchange(node.id(), node.id(), data, U.gridClassLoader());
+ spi.onExchange(node.id(), node.id(), data, U.gridClassLoader());
- msg.addDiscoveryData(locNodeId, adapter.collectExchangeData(node.id()));
+ msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
}
if (log.isDebugEnabled())
@@ -2959,7 +2959,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Collection<TcpDiscoveryNode> top = msg.topology();
if (top != null && !top.isEmpty()) {
- adapter.gridStartTime = msg.gridStartTime();
+ spi.gridStartTime = msg.gridStartTime();
for (TcpDiscoveryNode n : top) {
// Make all preceding nodes and local node visible.
@@ -3011,7 +3011,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Notify outside of synchronized block.
if (dataMap != null) {
for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- adapter.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
+ spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
}
}
@@ -3050,7 +3050,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (locNodeCoord) {
if (msg.verified()) {
- adapter.stats.onRingMessageReceived(msg);
+ spi.stats.onRingMessageReceived(msg);
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
@@ -3088,15 +3088,15 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
- adapter.stats.onNodeJoined();
+ spi.stats.onNodeJoined();
// Make sure that node with greater order will never get EVT_NODE_JOINED
// on node with less order.
assert node.internalOrder() > locNode.internalOrder() : "Invalid order [node=" + node +
", locNode=" + locNode + ", msg=" + msg + ", ring=" + ring + ']';
- if (adapter.locNodeVer.equals(node.version()))
- node.version(adapter.locNodeVer);
+ if (spi.locNodeVer.equals(node.version()))
+ node.version(spi.locNodeVer);
if (!locNodeCoord) {
boolean b = ring.topologyVersion(topVer);
@@ -3113,8 +3113,8 @@ class ServerImpl extends TcpDiscoveryImpl {
notifyDiscovery(EVT_NODE_JOINED, topVer, node);
try {
- if (adapter.ipFinder.isShared() && locNodeCoord)
- adapter.ipFinder.registerAddresses(node.socketAddresses());
+ if (spi.ipFinder.isShared() && locNodeCoord)
+ spi.ipFinder.registerAddresses(node.socketAddresses());
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
@@ -3172,9 +3172,9 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified() || !ring.hasRemoteNodes() || msg.senderNodeId() != null) {
- if (adapter.ipFinder.isShared() && !ring.hasRemoteNodes()) {
+ if (spi.ipFinder.isShared() && !ring.hasRemoteNodes()) {
try {
- adapter.ipFinder.unregisterAddresses(locNode.socketAddresses());
+ spi.ipFinder.unregisterAddresses(locNode.socketAddresses());
}
catch (IgniteSpiException e) {
U.error(log, "Failed to unregister local node address from IP finder.", e);
@@ -3222,7 +3222,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (locNodeCoord) {
if (msg.verified()) {
- adapter.stats.onRingMessageReceived(msg);
+ spi.stats.onRingMessageReceived(msg);
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
@@ -3297,7 +3297,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- adapter.stats.onNodeLeft();
+ spi.stats.onNodeLeft();
notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3392,7 +3392,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (locNodeCoord) {
if (msg.verified()) {
- adapter.stats.onRingMessageReceived(msg);
+ spi.stats.onRingMessageReceived(msg);
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
@@ -3442,7 +3442,7 @@ class ServerImpl extends TcpDiscoveryImpl {
notifyDiscovery(EVT_NODE_FAILED, topVer, node);
- adapter.stats.onNodeFailed();
+ spi.stats.onNodeFailed();
}
if (ring.hasRemoteNodes())
@@ -3535,7 +3535,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null &&
- U.currentTimeMillis() - locNode.lastUpdateTime() < adapter.hbFreq) {
+ U.currentTimeMillis() - locNode.lastUpdateTime() < spi.hbFreq) {
if (log.isDebugEnabled())
log.debug("Status check message discarded (local node receives updates).");
@@ -3643,8 +3643,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null ||
!hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) {
// Message is on its first ring or just created on coordinator.
- msg.setMetrics(locNodeId, adapter.metricsProvider.metrics());
- msg.setCacheMetrics(locNodeId, adapter.metricsProvider.cacheMetrics());
+ msg.setMetrics(locNodeId, spi.metricsProvider.metrics());
+ msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics());
for (Map.Entry<UUID, ClientMessageWorker> e : clientMsgWorkers.entrySet()) {
UUID nodeId = e.getKey();
@@ -3665,7 +3665,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
if (clientNode.visible()) {
if (clientNodeIds.contains(clientNode.id()))
- clientNode.aliveCheck(adapter.maxMissedClientHbs);
+ clientNode.aliveCheck(spi.maxMissedClientHbs);
else {
int aliveCheck = clientNode.decrementAliveCheck();
@@ -3815,12 +3815,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (sndNext && ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
else {
- adapter.stats.onRingMessageReceived(msg);
+ spi.stats.onRingMessageReceived(msg);
DiscoverySpiCustomMessage msgObj = null;
try {
- msgObj = msg.message(adapter.marsh);
+ msgObj = msg.message(spi.marsh);
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -3832,7 +3832,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (nextMsg != null) {
try {
addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg,
- adapter.marsh.marshal(nextMsg)));
+ spi.marsh.marshal(nextMsg)));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery custom message.", e);
@@ -3856,7 +3856,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Custom message.
*/
private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
- DiscoverySpiListener lsnr = adapter.lsnr;
+ DiscoverySpiListener lsnr = spi.lsnr;
TcpDiscoverySpiState spiState = spiStateCopy();
@@ -3873,7 +3873,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (node != null) {
try {
- DiscoverySpiCustomMessage msgObj = msg.message(adapter.marsh);
+ DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh);
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
msg.topologyVersion(),
@@ -3883,7 +3883,7 @@ class ServerImpl extends TcpDiscoveryImpl {
msgObj);
if (msgObj.isMutable())
- msg.message(msgObj, adapter.marsh.marshal(msgObj));
+ msg.message(msgObj, spi.marsh.marshal(msgObj));
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -3912,35 +3912,35 @@ class ServerImpl extends TcpDiscoveryImpl {
* @throws IgniteSpiException In case of error.
*/
TcpServer() throws IgniteSpiException {
- super(adapter.ignite().name(), "tcp-disco-srvr", log);
+ super(spi.ignite().name(), "tcp-disco-srvr", log);
- setPriority(adapter.threadPri);
+ setPriority(spi.threadPri);
- for (port = adapter.locPort; port < adapter.locPort + adapter.locPortRange; port++) {
+ for (port = spi.locPort; port < spi.locPort + spi.locPortRange; port++) {
try {
- srvrSock = new ServerSocket(port, 0, adapter.locHost);
+ srvrSock = new ServerSocket(port, 0, spi.locHost);
break;
}
catch (IOException e) {
- if (port < adapter.locPort + adapter.locPortRange - 1) {
+ if (port < spi.locPort + spi.locPortRange - 1) {
if (log.isDebugEnabled())
log.debug("Failed to bind to local port (will try next port within range) " +
- "[port=" + port + ", localHost=" + adapter.locHost + ']');
+ "[port=" + port + ", localHost=" + spi.locHost + ']');
onException("Failed to bind to local port. " +
- "[port=" + port + ", localHost=" + adapter.locHost + ']', e);
+ "[port=" + port + ", localHost=" + spi.locHost + ']', e);
}
else {
throw new IgniteSpiException("Failed to bind TCP server socket (possibly all ports in range " +
- "are in use) [firstPort=" + adapter.locPort + ", lastPort=" + (adapter.locPort + adapter.locPortRange - 1) +
- ", addr=" + adapter.locHost + ']', e);
+ "are in use) [firstPort=" + spi.locPort + ", lastPort=" + (spi.locPort + spi.locPortRange - 1) +
+ ", addr=" + spi.locHost + ']', e);
}
}
}
if (log.isInfoEnabled())
- log.info("Successfully bound to TCP port [port=" + port + ", localHost=" + adapter.locHost + ']');
+ log.info("Successfully bound to TCP port [port=" + port + ", localHost=" + spi.locHost + ']');
}
/** {@inheritDoc} */
@@ -3962,7 +3962,7 @@ class ServerImpl extends TcpDiscoveryImpl {
reader.start();
}
- adapter.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp);
+ spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp);
}
}
catch (IOException e) {
@@ -4007,13 +4007,13 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param sock Socket to read data from.
*/
SocketReader(Socket sock) {
- super(adapter.ignite().name(), "tcp-disco-sock-reader", log);
+ super(spi.ignite().name(), "tcp-disco-sock-reader", log);
this.sock = sock;
- setPriority(adapter.threadPri);
+ setPriority(spi.threadPri);
- adapter.stats.onSocketReaderCreated();
+ spi.stats.onSocketReaderCreated();
}
/** {@inheritDoc} */
@@ -4032,9 +4032,9 @@ class ServerImpl extends TcpDiscoveryImpl {
int timeout = sock.getSoTimeout();
- sock.setSoTimeout((int)adapter.netTimeout);
+ sock.setSoTimeout((int)spi.netTimeout);
- for (IgniteInClosure<Socket> connLsnr : adapter.incomeConnLsnrs)
+ for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
connLsnr.apply(sock);
in = new BufferedInputStream(sock.getInputStream());
@@ -4077,11 +4077,11 @@ class ServerImpl extends TcpDiscoveryImpl {
// Restore timeout.
sock.setSoTimeout(timeout);
- TcpDiscoveryAbstractMessage msg = adapter.readMessage(sock, in, adapter.netTimeout);
+ TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
// Ping.
if (msg instanceof TcpDiscoveryPingRequest) {
- if (!adapter.isNodeStopping0()) {
+ if (!spi.isNodeStopping0()) {
TcpDiscoveryPingRequest req = (TcpDiscoveryPingRequest)msg;
TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
@@ -4093,7 +4093,7 @@ class ServerImpl extends TcpDiscoveryImpl {
res.clientExists(clientWorker.ping());
}
- adapter.writeToSocket(sock, res);
+ spi.writeToSocket(sock, res);
}
else if (log.isDebugEnabled())
log.debug("Ignore ping request, node is stopping.");
@@ -4111,7 +4111,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryHandshakeResponse res =
new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
- adapter.writeToSocket(sock, res);
+ spi.writeToSocket(sock, res);
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
// the local node sends a handshake request message on the loopback address, so we get here.
@@ -4169,7 +4169,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (e.hasCause(SocketTimeoutException.class))
LT.warn(log, null, "Socket operation timed out on handshake " +
"(consider increasing 'networkTimeout' configuration property) " +
- "[netTimeout=" + adapter.netTimeout + ']');
+ "[netTimeout=" + spi.netTimeout + ']');
else if (e.hasCause(ClassNotFoundException.class))
LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
@@ -4187,14 +4187,14 @@ class ServerImpl extends TcpDiscoveryImpl {
while (!isInterrupted()) {
try {
- TcpDiscoveryAbstractMessage msg = adapter.marsh.unmarshal(in, U.gridClassLoader());
+ TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
msg.senderNodeId(nodeId);
if (log.isDebugEnabled())
log.debug("Message has been received: " + msg);
- adapter.stats.onMessageReceived(msg);
+ spi.stats.onMessageReceived(msg);
if (debugMode && recordable(msg))
debugLog("Message has been received: " + msg);
@@ -4217,14 +4217,14 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
if (state == CONNECTED) {
- adapter.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(sock, RES_OK);
msgWorker.addMessage(msg);
continue;
}
else {
- adapter.writeToSocket(sock, RES_CONTINUE_JOIN);
+ spi.writeToSocket(sock, RES_CONTINUE_JOIN);
break;
}
@@ -4232,7 +4232,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
// Send receipt back.
- adapter.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(sock, RES_OK);
boolean ignored = false;
@@ -4261,7 +4261,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
// Send receipt back.
- adapter.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(sock, RES_OK);
boolean ignored = false;
@@ -4290,7 +4290,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
// Send receipt back.
- adapter.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(sock, RES_OK);
boolean ignored = false;
@@ -4319,7 +4319,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
// Send receipt back.
- adapter.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(sock, RES_OK);
boolean ignored = false;
@@ -4361,7 +4361,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Send receipt back.
if (clientMsgWrk == null)
- adapter.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(sock, RES_OK);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -4461,7 +4461,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
if (state == CONNECTED) {
- adapter.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(sock, RES_OK);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -4473,7 +4473,7 @@ class ServerImpl extends TcpDiscoveryImpl {
return true;
}
else {
- adapter.stats.onMessageProcessingStarted(msg);
+ spi.stats.onMessageProcessingStarted(msg);
Integer res;
@@ -4492,14 +4492,14 @@ class ServerImpl extends TcpDiscoveryImpl {
// Local node is stopping. Remote node should try next one.
res = RES_CONTINUE_JOIN;
- adapter.writeToSocket(sock, res);
+ spi.writeToSocket(sock, res);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
fromAddrs.addAll(msg.node().socketAddresses());
- adapter.stats.onMessageProcessingFinished(msg);
+ spi.stats.onMessageProcessingFinished(msg);
return false;
}
@@ -4522,7 +4522,7 @@ class ServerImpl extends TcpDiscoveryImpl {
readers.remove(this);
}
- adapter.stats.onSocketReaderRemoved();
+ spi.stats.onSocketReaderRemoved();
}
/** {@inheritDoc} */
@@ -4539,13 +4539,13 @@ class ServerImpl extends TcpDiscoveryImpl {
* Constructor.
*/
StatisticsPrinter() {
- super(adapter.ignite().name(), "tcp-disco-stats-printer", log);
+ super(spi.ignite().name(), "tcp-disco-stats-printer", log);
- assert adapter.statsPrintFreq > 0;
+ assert spi.statsPrintFreq > 0;
assert log.isInfoEnabled();
- setPriority(adapter.threadPri);
+ setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@@ -4555,7 +4555,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Statistics printer has been started.");
while (!isInterrupted()) {
- Thread.sleep(adapter.statsPrintFreq);
+ Thread.sleep(spi.statsPrintFreq);
printStatistics();
}
@@ -4650,7 +4650,7 @@ class ServerImpl extends TcpDiscoveryImpl {
*
*/
public boolean ping() throws InterruptedException {
- if (adapter.isNodeStopping0())
+ if (spi.isNodeStopping0())
return false;
GridFutureAdapter<Boolean> fut;
@@ -4675,7 +4675,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
try {
- return fut.get(adapter.ackTimeout, TimeUnit.MILLISECONDS);
+ return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS);
}
catch (IgniteInterruptedCheckedException ignored) {
throw new InterruptedException();
@@ -4718,9 +4718,9 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param name Thread name.
*/
protected MessageWorkerAdapter(String name) {
- super(adapter.ignite().name(), name, log);
+ super(spi.ignite().name(), name, log);
- setPriority(adapter.threadPri);
+ setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@@ -4787,7 +4787,7 @@ class ServerImpl extends TcpDiscoveryImpl {
throws IOException, IgniteCheckedException {
bout.reset();
- adapter.writeToSocket(sock, msg, bout);
+ spi.writeToSocket(sock, msg, bout);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0e192ef8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 8dad92a..4836911 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -40,7 +40,7 @@ abstract class TcpDiscoveryImpl {
protected static final int RES_WAIT = 200;
/** */
- protected final TcpDiscoverySpi adapter;
+ protected final TcpDiscoverySpi spi;
/** */
protected final IgniteLogger log;
@@ -49,19 +49,19 @@ abstract class TcpDiscoveryImpl {
protected TcpDiscoveryNode locNode;
/**
- * @param adapter Adapter.
+ * @param spi Adapter.
*/
- TcpDiscoveryImpl(TcpDiscoverySpi adapter) {
- this.adapter = adapter;
+ TcpDiscoveryImpl(TcpDiscoverySpi spi) {
+ this.spi = spi;
- log = adapter.log;
+ log = spi.log;
}
/**
*
*/
public UUID getLocalNodeId() {
- return adapter.getLocalNodeId();
+ return spi.getLocalNodeId();
}
/**
@@ -69,7 +69,7 @@ abstract class TcpDiscoveryImpl {
* @param e Exception.
*/
protected void onException(String msg, Exception e){
- adapter.getExceptionRegistry().onException(msg, e);
+ spi.getExceptionRegistry().onException(msg, e);
}
/**