You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/29 18:09:21 UTC
incubator-ignite git commit: IGNITE-1169 Implement on
TcpCommunicationSpi sendWithAck methods. Added tests.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1169 [created] 325458125
IGNITE-1169 Implement on TcpCommunicationSpi sendWithAck methods. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/32545812
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/32545812
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/32545812
Branch: refs/heads/ignite-1169
Commit: 325458125fe5cb34c1cf79b3e7ece161b6cff05c
Parents: f82fb5c
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Jul 29 19:50:10 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Jul 29 19:09:00 2015 +0300
----------------------------------------------------------------------
.../util/nio/GridCommunicationClient.java | 11 +
.../util/nio/GridNioRecoveryDescriptor.java | 54 ++-
.../util/nio/GridShmemCommunicationClient.java | 8 +
.../util/nio/GridTcpNioCommunicationClient.java | 32 ++
.../communication/tcp/TcpCommunicationSpi.java | 70 ++-
...mmunicationSpiRecoveryAckFutureSelfTest.java | 447 +++++++++++++++++++
.../IgniteSpiCommunicationSelfTestSuite.java | 1 +
7 files changed, 619 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 693a5a4..0403272 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.jetbrains.annotations.*;
@@ -100,6 +101,16 @@ public interface GridCommunicationClient {
public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
/**
+ * @param nodeId Node ID (provided only if versions of local and remote nodes are different).
+ * @param msg Message to send.
+ * @param fut Future which done when will be received ack on the message.
+ * @throws IgniteCheckedException If failed.
+ * @return {@code True} if should try to resend message.
+ */
+ public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, GridFutureAdapter<Boolean> fut)
+ throws IgniteCheckedException;
+
+ /**
* @return {@code True} if send is asynchronous.
*/
public boolean async();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 733ae81..66ae60f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
@@ -36,6 +37,9 @@ public class GridNioRecoveryDescriptor {
/** Unacknowledged message futures. */
private final ArrayDeque<GridNioFuture<?>> msgFuts;
+ /** Unacknowledged message futures. */
+ private final Map<GridNioFuture<?>, GridFutureAdapter<Boolean>> ackFuts;
+
/** Number of messages to resend. */
private int resendCnt;
@@ -79,6 +83,7 @@ public class GridNioRecoveryDescriptor {
assert queueLimit > 0;
msgFuts = new ArrayDeque<>(queueLimit);
+ ackFuts = new HashMap<>(queueLimit);
this.queueLimit = queueLimit;
this.node = node;
@@ -166,6 +171,16 @@ public class GridNioRecoveryDescriptor {
}
/**
+ * @param nioFut fut NIO future.
+ * @param fut ack future.
+ */
+ public void add(GridNioFuture<?> nioFut, GridFutureAdapter<Boolean> fut) {
+ assert fut != null;
+
+ ackFuts.put(nioFut, fut);
+ }
+
+ /**
* @param rcvCnt Number of messages received by remote node.
*/
public void ackReceived(long rcvCnt) {
@@ -173,6 +188,8 @@ public class GridNioRecoveryDescriptor {
log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt +
", msgFuts=" + msgFuts.size() + ']');
+ GridFutureAdapter<Boolean> ackFut;
+
while (acked < rcvCnt) {
GridNioFuture<?> fut = msgFuts.pollFirst();
@@ -183,6 +200,12 @@ public class GridNioRecoveryDescriptor {
assert fut.isDone() : fut;
acked++;
+
+ if (!ackFuts.isEmpty() && (ackFut = ackFuts.get(fut)) != null) {
+ ackFut.onDone(true);
+
+ ackFuts.remove(fut);
+ }
}
}
@@ -191,6 +214,7 @@ public class GridNioRecoveryDescriptor {
*/
public void onNodeLeft() {
GridNioFuture<?>[] futs = null;
+ GridFutureAdapter<?>[] akFuts = null;
synchronized (this) {
nodeLeft = true;
@@ -200,10 +224,16 @@ public class GridNioRecoveryDescriptor {
msgFuts.clear();
}
+
+ if (!reserved && !ackFuts.isEmpty()) {
+ akFuts = ackFuts.values().toArray(new GridFutureAdapter<?>[ackFuts.size()]);
+
+ ackFuts.clear();
+ }
}
if (futs != null)
- completeOnNodeLeft(futs);
+ completeOnNodeLeft(futs, akFuts);
}
/**
@@ -214,6 +244,13 @@ public class GridNioRecoveryDescriptor {
}
/**
+ * @return Futures for unacknowledged messages.
+ */
+ public Collection<GridFutureAdapter<Boolean>> ackMessageFutures() {
+ return ackFuts.values();
+ }
+
+ /**
* @param node Node.
* @return {@code True} if node is not null and has the same order as initial remtoe node.
*/
@@ -278,6 +315,7 @@ public class GridNioRecoveryDescriptor {
*/
public void release() {
GridNioFuture<?>[] futs = null;
+ GridFutureAdapter<?>[] akFuts = null;
synchronized (this) {
connected = false;
@@ -302,10 +340,16 @@ public class GridNioRecoveryDescriptor {
msgFuts.clear();
}
+
+ if (nodeLeft && !ackFuts.isEmpty()) {
+ akFuts = ackFuts.values().toArray(new GridFutureAdapter<?>[ackFuts.size()]);
+
+ ackFuts.clear();
+ }
}
if (futs != null)
- completeOnNodeLeft(futs);
+ completeOnNodeLeft(futs, akFuts);
}
/**
@@ -356,10 +400,14 @@ public class GridNioRecoveryDescriptor {
/**
* @param futs Futures to complete.
+ * @param ackFuts Ack futures to complete.
*/
- private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
+ private void completeOnNodeLeft(GridNioFuture<?>[] futs, GridFutureAdapter<?>[] ackFuts) {
for (GridNioFuture<?> msg : futs)
((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id()));
+
+ for (GridFutureAdapter<?> fut : ackFuts)
+ fut.onDone(new IOException("Failed to send message, node has left: " + node.id()));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index e05c37a..134d271 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -135,6 +137,12 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
}
/** {@inheritDoc} */
+ @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg,
+ GridFutureAdapter<Boolean> fut) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index abad875..834371f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.extensions.communication.*;
@@ -123,6 +124,37 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
}
/** {@inheritDoc} */
+ @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg,
+ GridFutureAdapter<Boolean> fut) throws IgniteCheckedException {
+ // Node ID is never provided in asynchronous send mode.
+ assert nodeId == null;
+
+ GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+
+ GridNioFuture<?> nioFut = ses.send(msg);
+
+ if (nioFut.isDone()) {
+ try {
+ nioFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
+
+ if (e.getCause() instanceof IOException)
+ return true;
+ else
+ throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
+ }
+ }
+
+ if (recovery != null)
+ recovery.add(nioFut, fut);
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean async() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a0acb5c..53c6ddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1350,7 +1350,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) {
U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit " +
"(slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit +
- ", slowClientQueueLimit=" + slowClientQueueLimit + ']');
+ ", slowClientQueueLimit=" + slowClientQueueLimit + ']');
}
registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
@@ -1749,6 +1749,73 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ *
+ * @param node
+ * @param msg
+ * @return
+ * @throws IgniteSpiException
+ */
+ public IgniteInternalFuture<Boolean> sendMessageWithAck(ClusterNode node, Message msg) throws IgniteSpiException {
+ assert node != null;
+ assert msg != null;
+
+ if (log.isTraceEnabled())
+ log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
+
+ IgniteInternalFuture<Boolean> fut = null;
+
+ if (node.id().equals(getLocalNode().id())) {
+ notifyListener(node.id(), msg, NOOP);
+
+ fut = new GridFinishedFuture<>(true);
+ }
+ else {
+ GridCommunicationClient client = null;
+
+ try {
+ boolean retry;
+
+ do {
+ client = reserveClient(node);
+
+ UUID nodeId = null;
+
+ if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
+ nodeId = node.id();
+
+ fut = new GridFutureAdapter<>();
+
+ retry = client.sendMessageWithAck(nodeId, msg, (GridFutureAdapter)fut);
+
+ client.release();
+
+ client = null;
+
+ if (!retry)
+ sentMsgsCnt.increment();
+ else {
+ ClusterNode node0 = getSpiContext().node(node.id());
+
+ if (node0 == null)
+ throw new IgniteCheckedException("Failed to send message to remote node " +
+ "(node has left the grid): " + node.id());
+ }
+ }
+ while (retry);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
+ }
+ finally {
+ if (client != null && clients.remove(node.id(), client))
+ client.forceClose();
+ }
+ }
+
+ return fut;
+ }
+
+ /**
* Returns existing or just created client to node.
*
* @param node Node to which client should be open.
@@ -2086,6 +2153,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine);
}
+
if (recoveryDesc != null) {
recoveryDesc.onHandshake(rcvCnt);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
new file mode 100644
index 0000000..2c2ef2e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.*;
+import org.apache.ignite.testframework.junits.spi.*;
+
+import org.eclipse.jetty.util.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+ /** */
+ private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
+
+ /** */
+ protected static final List<TcpCommunicationSpi> spis = new ArrayList<>();
+
+ /** */
+ protected static final List<ClusterNode> nodes = new ArrayList<>();
+
+ /** */
+ private static final int SPI_CNT = 2;
+
+ /**
+ *
+ */
+ static {
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
+ }
+
+ /**
+ * Disable SPI auto-start.
+ */
+ public GridTcpCommunicationSpiRecoveryAckFutureSelfTest() {
+ super(false);
+ }
+
+ /** */
+ @SuppressWarnings({"deprecation"})
+ private class TestListener implements CommunicationListener<Message> {
+ /** */
+ private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
+
+ /** */
+ private AtomicInteger rcvCnt = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
+ info("Test listener received message: " + msg);
+
+ assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
+
+ GridTestMessage msg0 = (GridTestMessage)msg;
+
+ assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId()));
+
+ rcvCnt.incrementAndGet();
+
+ msgC.run();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(UUID nodeId) {
+ // No-op.
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAckOnIdle() throws Exception {
+ checkAck(10, 2000, 9);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAckOnCount() throws Exception {
+ checkAck(10, 60_000, 10);
+ }
+
+ /**
+ * @param ackCnt Recovery acknowledgement count.
+ * @param idleTimeout Idle connection timeout.
+ * @param msgPerIter Messages per iteration.
+ * @throws Exception If failed.
+ */
+ private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception {
+ createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT);
+
+ try {
+ TcpCommunicationSpi spi0 = spis.get(0);
+ TcpCommunicationSpi spi1 = spis.get(1);
+
+ ClusterNode node0 = nodes.get(0);
+ ClusterNode node1 = nodes.get(1);
+
+ int msgId = 0;
+
+ int expMsgs = 0;
+
+ for (int i = 0; i < 5; i++) {
+ info("Iteration: " + i);
+
+ List<IgniteInternalFuture<Boolean>> futs = new ArrayList<>();
+
+ for (int j = 0; j < msgPerIter; j++) {
+ futs.add(spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0)));
+
+ futs.add(spi1.sendMessageWithAck(node0, new GridTestMessage(node1.id(), ++msgId, 0)));
+ }
+
+ expMsgs += msgPerIter;
+
+ for (TcpCommunicationSpi spi : spis) {
+ GridNioServer srv = U.field(spi, "nioSrvr");
+
+ Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+ assertFalse(sessions.isEmpty());
+
+ boolean found = false;
+
+ for (GridNioSession ses : sessions) {
+ final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+
+ if (recoveryDesc != null) {
+ found = true;
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return recoveryDesc.messagesFutures().isEmpty();
+ }
+ }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 :
+ 10_000);
+
+ assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
+ recoveryDesc.messagesFutures().size());
+
+ assertEquals("Unexpected ack messages: " + recoveryDesc.ackMessageFutures(), 0,
+ recoveryDesc.ackMessageFutures().size());
+
+ break;
+ }
+ }
+
+ assertTrue(found);
+ }
+
+ final int expMsgs0 = expMsgs;
+
+ for (TcpCommunicationSpi spi : spis) {
+ final TestListener lsnr = (TestListener)spi.getListener();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ return lsnr.rcvCnt.get() >= expMsgs0;
+ }
+ }, 5000);
+
+ assertEquals(expMsgs, lsnr.rcvCnt.get());
+ }
+
+ for (IgniteInternalFuture<Boolean> f : futs)
+ assert f.get();
+ }
+ }
+ finally {
+ stopSpis();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueOverflow() throws Exception {
+ for (int i = 0; i < 3; i++) {
+ try {
+ startSpis(5, 60_000, 10);
+
+ checkOverflow();
+
+ break;
+ }
+ catch (IgniteCheckedException e) {
+ if (e.hasCause(BindException.class)) {
+ if (i < 2) {
+ info("Got exception caused by BindException, will retry after delay: " + e);
+
+ stopSpis();
+
+ U.sleep(10_000);
+ }
+ else
+ throw e;
+ }
+ else
+ throw e;
+ }
+ finally {
+ stopSpis();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkOverflow() throws Exception {
+ TcpCommunicationSpi spi0 = spis.get(0);
+ TcpCommunicationSpi spi1 = spis.get(1);
+
+ ClusterNode node0 = nodes.get(0);
+ ClusterNode node1 = nodes.get(1);
+
+ final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+
+ int msgId = 0;
+
+ // Send message to establish connection.
+ spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+ // Prevent node1 from send
+ GridTestUtils.setFieldValue(srv1, "skipWrite", true);
+
+ final GridNioSession ses0 = communicationSession(spi0);
+
+ for (int i = 0; i < 150; i++)
+ spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+ // Wait when session is closed because of queue overflow.
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return ses0.closeTime() != 0;
+ }
+ }, 5000);
+
+ assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
+
+ GridTestUtils.setFieldValue(srv1, "skipWrite", false);
+
+ for (int i = 0; i < 100; i++)
+ spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+ final int expMsgs = 251;
+
+ final TestListener lsnr = (TestListener)spi1.getListener();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ return lsnr.rcvCnt.get() >= expMsgs;
+ }
+ }, 5000);
+
+ assertEquals(expMsgs, lsnr.rcvCnt.get());
+ }
+
+ /**
+ * @param spi SPI.
+ * @return Session.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
+ final GridNioServer srv = U.field(spi, "nioSrvr");
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+ return !sessions.isEmpty();
+ }
+ }, 5000);
+
+ Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+ assertEquals(1, sessions.size());
+
+ return sessions.iterator().next();
+ }
+
+ /**
+ * @param ackCnt Recovery acknowledgement count.
+ * @param idleTimeout Idle connection timeout.
+ * @param queueLimit Message queue limit.
+ * @return SPI instance.
+ */
+ protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) {
+ TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+ spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+ spi.setIdleConnectionTimeout(idleTimeout);
+ spi.setTcpNoDelay(true);
+ spi.setAckSendThreshold(ackCnt);
+ spi.setMessageQueueLimit(queueLimit);
+ spi.setSharedMemoryPort(-1);
+
+ return spi;
+ }
+
+ /**
+ * @param ackCnt Recovery acknowledgement count.
+ * @param idleTimeout Idle connection timeout.
+ * @param queueLimit Message queue limit.
+ * @throws Exception If failed.
+ */
+ private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+ spis.clear();
+ nodes.clear();
+ spiRsrcs.clear();
+
+ Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+
+ for (int i = 0; i < SPI_CNT; i++) {
+ TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
+
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
+ IgniteTestResources rsrcs = new IgniteTestResources();
+
+ GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+
+ GridSpiTestContext ctx = initSpiContext();
+
+ ctx.setLocalNode(node);
+
+ spiRsrcs.add(rsrcs);
+
+ rsrcs.inject(spi);
+
+ spi.setListener(new TestListener());
+
+ node.setAttributes(spi.getNodeAttributes());
+
+ nodes.add(node);
+
+ spi.spiStart(getTestGridName() + (i + 1));
+
+ spis.add(spi);
+
+ spi.onContextInitialized(ctx);
+
+ ctxs.put(node, ctx);
+ }
+
+ // For each context set remote nodes.
+ for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) {
+ for (ClusterNode n : nodes) {
+ if (!n.equals(e.getKey()))
+ e.getValue().remoteNodes().add(n);
+ }
+ }
+ }
+
+ /**
+ * @param ackCnt Recovery acknowledgement count.
+ * @param idleTimeout Idle connection timeout.
+ * @param queueLimit Message queue limit.
+ * @throws Exception If failed.
+ */
+ private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+ for (int i = 0; i < 3; i++) {
+ try {
+ startSpis(ackCnt, idleTimeout, queueLimit);
+
+ break;
+ }
+ catch (IgniteCheckedException e) {
+ if (e.hasCause(BindException.class)) {
+ if (i < 2) {
+ info("Failed to start SPIs because of BindException, will retry after delay.");
+
+ stopSpis();
+
+ U.sleep(10_000);
+ }
+ else
+ throw e;
+ }
+ else
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void stopSpis() throws Exception {
+ for (CommunicationSpi<Message> spi : spis) {
+ spi.onContextDestroyed();
+
+ spi.setListener(null);
+
+ spi.spiStop();
+ }
+
+ for (IgniteTestResources rsrcs : spiRsrcs)
+ rsrcs.stopThreads();
+
+ spis.clear();
+ nodes.clear();
+ spiRsrcs.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index ff86bda..dcb8058 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -32,6 +32,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Communication SPI Test Suite");
suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class));
+ suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckFutureSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));