You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/18 18:12:27 UTC
[38/46] incubator-ignite git commit: Squashed commit of the
IGNITE-1229
Squashed commit of the IGNITE-1229
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7635e589
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7635e589
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7635e589
Branch: refs/heads/ignite-843
Commit: 7635e5894df6aab477b82253451b729985f632be
Parents: 1f00c70
Author: Denis Magda <dm...@gridgain.com>
Authored: Mon Aug 17 16:41:03 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Aug 17 16:41:03 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 57 ++++++-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 45 ++++--
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 152 ++++++++++++++++++-
.../TcpDiscoverySpiFailureTimeoutSelfTest.java | 8 +-
4 files changed, 245 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7635e589/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 76144e3..40e110f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -114,7 +114,7 @@ class ServerImpl extends TcpDiscoveryImpl {
protected TcpDiscoverySpiState spiState = DISCONNECTED;
/** Map with proceeding ping requests. */
- private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
+ private final ConcurrentMap<InetSocketAddress, GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap =
new ConcurrentHashMap8<>();
/**
@@ -497,9 +497,9 @@ class ServerImpl extends TcpDiscoveryImpl {
return F.t(getLocalNodeId(), clientPingRes);
}
- GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
+ GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridPingFutureAdapter<>();
- IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
+ GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
if (oldFut != null)
return oldFut.get();
@@ -520,7 +520,11 @@ class ServerImpl extends TcpDiscoveryImpl {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr, timeoutHelper);
+ sock = spi.createSocket();
+
+ fut.sock = sock;
+
+ sock = spi.openSocket(sock, addr, timeoutHelper);
openedSock = true;
@@ -597,6 +601,21 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ /**
+ * Interrupts all existed 'ping' request for the given node.
+ *
+ * @param node Node that may be pinged.
+ */
+ private void interruptPing(TcpDiscoveryNode node) {
+ for (InetSocketAddress addr : spi.getNodeAddresses(node)) {
+ GridPingFutureAdapter fut = pingMap.get(addr);
+
+ if (fut != null && fut.sock != null)
+ // Reference to the socket is not set to null. No need to assign it to a local variable.
+ U.closeQuiet(fut.sock);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void disconnect() throws IgniteSpiException {
spiStop0(true);
@@ -3366,6 +3385,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified() && !locNodeId.equals(leavingNodeId)) {
TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId);
+ interruptPing(leavingNode);
+
assert leftNode != null;
if (log.isDebugEnabled())
@@ -3533,6 +3554,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
node = ring.removeNode(nodeId);
+ interruptPing(node);
+
assert node != null;
long topVer;
@@ -5142,4 +5165,30 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.writeToSocket(sock, msg, bout, timeout);
}
}
+
+ /**
+ *
+ */
+ private static class GridPingFutureAdapter<R> extends GridFutureAdapter<R> {
+ /** Socket. */
+ private volatile Socket sock;
+
+ /**
+ * Returns socket associated with this ping future.
+ *
+ * @return Socket or {@code null} if no socket associated.
+ */
+ public Socket sock() {
+ return sock;
+ }
+
+ /**
+ * Associates socket with this ping future.
+ *
+ * @param sock Socket.
+ */
+ public void sock(Socket sock) {
+ this.sock = sock;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7635e589/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 18a540c..2f3d410 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1167,18 +1167,49 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* @param timeoutHelper Timeout helper.
* @return Opened socket.
* @throws IOException If failed.
+ * @throws IgniteSpiOperationTimeoutException In case of timeout.
*/
protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {
- assert sockAddr != null;
+ return openSocket(createSocket(), sockAddr, timeoutHelper);
+ }
- InetSocketAddress resolved = sockAddr.isUnresolved() ?
- new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
+ /**
+ * Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established.
+ *
+ * @param sock Socket bound to a local host address.
+ * @param remAddr Remote address.
+ * @param timeoutHelper Timeout helper.
+ * @return Connected socket.
+ * @throws IOException If failed.
+ * @throws IgniteSpiOperationTimeoutException In case of timeout.
+ */
+ protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+ throws IOException, IgniteSpiOperationTimeoutException {
+
+ assert remAddr != null;
+
+ InetSocketAddress resolved = remAddr.isUnresolved() ?
+ new InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), remAddr.getPort()) : remAddr;
InetAddress addr = resolved.getAddress();
assert addr != null;
+ sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
+
+ writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
+
+ return sock;
+ }
+
+ /**
+ * Creates socket binding it to a local host address. This operation is not blocking.
+ *
+ * @return Created socket.
+ * @throws IOException If failed.
+ */
+ Socket createSocket() throws IOException {
Socket sock;
if (isSslEnabled())
@@ -1190,10 +1221,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
sock.setTcpNoDelay(true);
- sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
-
- writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
-
return sock;
}
@@ -1250,8 +1277,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If marshalling failed.
*/
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
- throws IOException, IgniteCheckedException {
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+ IgniteCheckedException {
writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7635e589/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 9a44c24..2b404c7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -70,7 +70,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi spi = new TcpDiscoverySpi();
+ TcpDiscoverySpi spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
+ new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
discoMap.put(gridName, spi);
@@ -128,6 +129,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
if (U.isMacOs())
spi.setLocalAddress(F.first(U.allLocalIps()));
}
+ else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode"))
+ cfg.setFailureDetectionTimeout(30_000);
return cfg;
}
@@ -339,6 +342,153 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If any error occurs.
*/
+ public void testPingInterruptedOnNodeFailed() throws Exception {
+ try {
+ final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode");
+ final Ignite failedNode = startGrid("testPingInterruptedOnNodeFailedFailingNode");
+ startGrid("testPingInterruptedOnNodeFailedSimpleNode");
+
+ ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true;
+
+ final CountDownLatch pingLatch = new CountDownLatch(1);
+
+ final CountDownLatch eventLatch = new CountDownLatch(1);
+
+ final AtomicBoolean pingRes = new AtomicBoolean(true);
+
+ final AtomicBoolean failRes = new AtomicBoolean(false);
+
+ long startTs = System.currentTimeMillis();
+
+ pingingNode.events().localListen(
+ new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ if (((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id())) {
+ failRes.set(true);
+ eventLatch.countDown();
+ }
+
+ return true;
+ }
+ },
+ EventType.EVT_NODE_FAILED);
+
+ IgniteInternalFuture<?> pingFut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ pingLatch.countDown();
+
+ pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
+ failedNode.cluster().localNode().id()));
+
+ return null;
+ }
+ }, 1);
+
+ IgniteInternalFuture<?> failingFut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ pingLatch.await();
+
+ Thread.sleep(3000);
+
+ ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).simulateNodeFailure();
+
+ return null;
+ }
+ }, 1);
+
+ failingFut.get();
+ pingFut.get();
+
+ assertFalse(pingRes.get());
+
+ assertTrue(System.currentTimeMillis() - startTs <
+ pingingNode.configuration().getFailureDetectionTimeout() / 2);
+
+ assertTrue(eventLatch.await(7, TimeUnit.SECONDS));
+ assertTrue(failRes.get());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testPingInterruptedOnNodeLeft() throws Exception {
+ try {
+ final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode");
+ final Ignite leftNode = startGrid("testPingInterruptedOnNodeFailedFailingNode");
+ startGrid("testPingInterruptedOnNodeFailedSimpleNode");
+
+ ((TestTcpDiscoverySpi)leftNode.configuration().getDiscoverySpi()).ignorePingResponse = true;
+
+ final CountDownLatch pingLatch = new CountDownLatch(1);
+
+ final AtomicBoolean pingRes = new AtomicBoolean(true);
+
+ long startTs = System.currentTimeMillis();
+
+ IgniteInternalFuture<?> pingFut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ pingLatch.countDown();
+
+ pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
+ leftNode.cluster().localNode().id()));
+
+ return null;
+ }
+ }, 1);
+
+ IgniteInternalFuture<?> stoppingFut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ pingLatch.await();
+
+ Thread.sleep(3000);
+
+ stopGrid("testPingInterruptedOnNodeFailedFailingNode");
+
+ return null;
+ }
+ }, 1);
+
+ stoppingFut.get();
+ pingFut.get();
+
+ assertFalse(pingRes.get());
+
+ assertTrue(System.currentTimeMillis() - startTs <
+ pingingNode.configuration().getFailureDetectionTimeout() / 2);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private boolean ignorePingResponse;
+
+ /** {@inheritDoc} */
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+ IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
+ return;
+ else
+ super.writeToSocket(sock, msg, timeout);
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
public void testNodeAdded() throws Exception {
try {
final Ignite g1 = startGrid(1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7635e589/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
index fbea187..df36644 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -305,7 +305,8 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
/** {@inheritDoc} */
- @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+ @Override protected Socket openSocket(Socket sock, InetSocketAddress sockAddr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {
if (openSocketTimeout) {
@@ -330,11 +331,12 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
}
}
- Socket sock = super.openSocket(sockAddr, timeoutHelper);
+ super.openSocket(sock, sockAddr, timeoutHelper);
try {
Thread.sleep(1500);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
// Ignore
}