You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/18 14:49:50 UTC
incubator-ignite git commit: # fosters-debug
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1003-debug [created] 747b283e3
# fosters-debug
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/747b283e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/747b283e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/747b283e
Branch: refs/heads/ignite-1003-debug
Commit: 747b283e3bf2114a8a88912dae519e0d6ab13a15
Parents: 01eee2d
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 18 13:32:41 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 18 13:32:41 2015 +0300
----------------------------------------------------------------------
.../ignite/examples/ExampleNodeStartup.java | 13 +-
.../communication/tcp/TcpCommunicationSpi.java | 64 +++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 180 +++++++++++--------
3 files changed, 161 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/747b283e/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
index f972b53..bb154c5 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
@@ -18,6 +18,7 @@
package org.apache.ignite.examples;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
/**
* Starts up an empty node with example compute configuration.
@@ -29,7 +30,15 @@ public class ExampleNodeStartup {
* @param args Command line arguments, none required.
* @throws IgniteException If failed.
*/
- public static void main(String[] args) throws IgniteException {
- Ignition.start("examples/config/example-ignite.xml");
+ public static void main(String[] args) throws Exception {
+ try (Ignite ignite = Ignition.start("C:\\Users\\gridgain\\ignite\\apache\\incubator-ignite\\work\\test-client.xml")) {
+ ignite.destroyCache(null);
+
+ try (IgniteCache cache = ignite.createCache(new CacheConfiguration())) {
+ cache.put(1, 1);
+ }
+
+ Thread.sleep(Long.MAX_VALUE);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/747b283e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 9e38788..38eb10d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -267,23 +267,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (!isNodeStopping()) {
GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
- if (!getSpiContext().tryFailNode(id)) {
- if (recoveryData != null) {
- if (recoveryData.nodeAlive(getSpiContext().node(id))) {
- if (!recoveryData.messagesFutures().isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Session was closed but there are unacknowledged messages, " +
- "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
-
- commWorker.addReconnectRequest(recoveryData);
- }
+ if (recoveryData != null) {
+ if (recoveryData.nodeAlive(getSpiContext().node(id))) {
+ if (!recoveryData.messagesFutures().isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Session was closed but there are unacknowledged messages, " +
+ "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
+
+ commWorker.addReconnectRequest(recoveryData);
}
- else
- recoveryData.onNodeLeft();
}
+ else
+ recoveryData.onNodeLeft();
}
- else
- recoveryData.onNodeLeft();
}
}
@@ -1876,6 +1872,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
+ log.info("Start connect [timeout=" + connTimeout +
+ ", rmtAddrs0=" + rmtAddrs0 +
+ ", rmtHostNames0=" + rmtHostNames0 +
+ ", port=" + boundPort +
+ ", extAddrs=" + extAddrs +
+ ", node=" + node + ']');
+
boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
boolean isExtAddrsExist = !F.isEmpty(extAddrs);
@@ -1884,18 +1887,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"TCP communication addresses or mapped external addresses. Check configuration and make sure " +
"that you use the same communication SPI on all nodes. Remote node id: " + node.id());
- List<InetSocketAddress> addrs;
+ LinkedHashSet<InetSocketAddress> addrs;
// Try to connect first on bound addresses.
if (isRmtAddrsExist) {
- addrs = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
+ List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
- Collections.sort(addrs, U.inetAddressesComparator(sameHost));
+ Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
+
+ addrs = new LinkedHashSet<>(addrs0);
}
else
- addrs = new ArrayList<>();
+ addrs = new LinkedHashSet<>();
// Then on mapped external addresses.
if (isExtAddrsExist)
@@ -1907,12 +1912,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
int connectAttempts = 1;
+ log.info("Try connect [timeout=" + connTimeout + ", nodeAddrs=" + addrs + ", node=" + node + ']');
+
+ long start = U.currentTimeMillis();
+
for (InetSocketAddress addr : addrs) {
long connTimeout0 = connTimeout;
int attempt = 1;
while (!conn) { // Reconnection on handshake timeout.
+ long start0 = U.currentTimeMillis();
+
+ log.info("Try connect to address [timeout=" + connTimeout0 + ", addr=" + addr + ", node=" + node + ']');
+
try {
SocketChannel ch = SocketChannel.open();
@@ -2048,12 +2061,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
break;
}
+ finally {
+ log.info("End connect to address [time=" + (U.currentTimeMillis() - start0) +
+ ", connected=" + (client != null) +
+ ", node=" + node +
+ ", addr=" + addr + ']');
+ }
}
if (conn)
break;
}
+ log.info("End connect [time=" + (U.currentTimeMillis() - start) +
+ ", connected=" + (client != null) +
+ ", node=" + node +
+ ", addrs=" + addrs + ']');
+
if (client == null) {
assert errs != null;
@@ -2610,7 +2634,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
ClusterNode node = recoveryDesc.node();
- if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+ if (clients.containsKey(node.id()) ||
+ !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
+ !getSpiContext().pingNode(node.id()))
return;
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/747b283e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 63f165d..a55d803 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -435,21 +435,30 @@ class ServerImpl extends TcpDiscoveryImpl {
return false;
}
- for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
- try {
- // ID returned by the node should be the same as ID of the parameter for ping to succeed.
- IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
+ log.info("Start ping node [node=" + node + ']');
- return node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']');
+ long start = U.currentTimeMillis();
- onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e);
- // continue;
+ try {
+ for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
+ try {
+ // ID returned by the node should be the same as ID of the parameter for ping to succeed.
+ IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
+
+ return node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']');
+
+ onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e);
+ // continue;
+ }
}
}
+ finally {
+ log.info("End ping node [time=" + (U.currentTimeMillis() - start) + ", node=" + node + ']');
+ }
return false;
}
@@ -463,102 +472,117 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
throws IgniteCheckedException {
- assert addr != null;
+ log.info("Start ping address [addr=" + addr + ", clientNodeId=" + clientNodeId + ']');
- UUID locNodeId = getLocalNodeId();
+ long start0 = U.currentTimeMillis();
- if (F.contains(spi.locNodeAddrs, addr)) {
- if (clientNodeId == null)
- return F.t(getLocalNodeId(), false);
+ try {
+ assert addr != null;
- ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId);
+ UUID locNodeId = getLocalNodeId();
- if (clientWorker == null)
- return F.t(getLocalNodeId(), false);
+ if (F.contains(spi.locNodeAddrs, addr)) {
+ if (clientNodeId == null)
+ return F.t(getLocalNodeId(), false);
- boolean clientPingRes;
+ ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId);
- try {
- clientPingRes = clientWorker.ping();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (clientWorker == null)
+ return F.t(getLocalNodeId(), false);
+
+ boolean clientPingRes;
+
+ try {
+ clientPingRes = clientWorker.ping();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
- throw new IgniteInterruptedCheckedException(e);
+ return F.t(getLocalNodeId(), clientPingRes);
}
- return F.t(getLocalNodeId(), clientPingRes);
- }
+ GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
- GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
+ IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
- IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
+ if (oldFut != null)
+ return oldFut.get();
+ else {
+ Collection<Throwable> errs = null;
- if (oldFut != null)
- return oldFut.get();
- else {
- Collection<Throwable> errs = null;
+ try {
+ Socket sock = null;
- try {
- Socket sock = null;
+ for (int i = 0; i < spi.reconCnt; i++) {
+ long start1 = U.currentTimeMillis();
- for (int i = 0; i < spi.reconCnt; i++) {
- try {
- if (addr.isUnresolved())
- addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
+ try {
+ log.info("Ping address [iter=" + i + ", addr=" + addr + ", clientNodeId=" + clientNodeId + ']');
- long tstamp = U.currentTimeMillis();
+ if (addr.isUnresolved())
+ addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
- sock = spi.openSocket(addr);
+ long tstamp = U.currentTimeMillis();
- spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+ sock = spi.openSocket(addr);
- TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout);
+ spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
- if (locNodeId.equals(res.creatorNodeId())) {
- if (log.isDebugEnabled())
- log.debug("Ping response from local node: " + res);
+ TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout);
- break;
- }
+ if (locNodeId.equals(res.creatorNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Ping response from local node: " + res);
- spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+ break;
+ }
- IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
+ spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
- fut.onDone(t);
+ IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
- return t;
- }
- catch (IOException | IgniteCheckedException e) {
- if (errs == null)
- errs = new ArrayList<>();
+ fut.onDone(t);
- errs.add(e);
- }
- finally {
- U.closeQuiet(sock);
+ return t;
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (errs == null)
+ errs = new ArrayList<>();
+
+ errs.add(e);
+ }
+ finally {
+ U.closeQuiet(sock);
+
+ log.info("Ping address end [iter=" + i + ", time=" + (U.currentTimeMillis() - start1) + ", addr=" + addr + ", clientNodeId=" + clientNodeId + ']');
+ }
}
}
- }
- catch (Throwable t) {
- fut.onDone(t);
+ catch (Throwable t) {
+ fut.onDone(t);
- if (t instanceof Error)
- throw t;
+ if (t instanceof Error)
+ throw t;
- throw U.cast(t);
- }
- finally {
- if (!fut.isDone())
- fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs));
+ throw U.cast(t);
+ }
+ finally {
+ if (!fut.isDone())
+ fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs));
- boolean b = pingMap.remove(addr, fut);
+ boolean b = pingMap.remove(addr, fut);
- assert b;
- }
+ assert b;
+ }
- return fut.get();
+ return fut.get();
+ }
+ }
+ finally {
+ log.info("End ping address [time=" + (U.currentTimeMillis() - start0) + ", addr=" + addr + ", clientNodeId=" + clientNodeId + ']');
}
}
@@ -1596,6 +1620,10 @@ class ServerImpl extends TcpDiscoveryImpl {
Boolean res = pingResMap.get(addr);
if (res == null) {
+ log.info("Cleaner start ping node [addr=" + addr + ']');
+
+ long start = U.currentTimeMillis();
+
try {
res = pingNode(addr, null).get1() != null;
}
@@ -1608,6 +1636,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
finally {
pingResMap.put(addr, res);
+
+ log.info("Cleaner end ping node [time=" + (U.currentTimeMillis() - start) + ", addr=" + addr + ']');
}
}