You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/08/31 23:25:08 UTC
[45/50] [abbrv] ignite git commit: IGNITE-1169 Implemented send with
ack methods on TcpCommunication and GridIoManager. Added tests.
IGNITE-1169 Implemented send with ack methods on TcpCommunication and GridIoManager. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c10ade5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c10ade5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c10ade5
Branch: refs/heads/ignite-950
Commit: 1c10ade5a50c505ef5ed574ae7001ef7e779cf2e
Parents: aec9764
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 31 16:34:24 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 31 16:34:53 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 108 ++++-
.../util/nio/GridCommunicationClient.java | 5 +-
.../util/nio/GridNioFinishedFuture.java | 12 +
.../ignite/internal/util/nio/GridNioFuture.java | 14 +
.../internal/util/nio/GridNioFutureImpl.java | 15 +
.../util/nio/GridNioRecoveryDescriptor.java | 13 +-
.../ignite/internal/util/nio/GridNioServer.java | 5 +
.../util/nio/GridNioSessionMetaKey.java | 5 +-
.../util/nio/GridShmemCommunicationClient.java | 7 +-
.../util/nio/GridTcpNioCommunicationClient.java | 14 +-
.../communication/tcp/TcpCommunicationSpi.java | 43 +-
...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++
.../IgniteSpiCommunicationSelfTestSuite.java | 1 +
13 files changed, 685 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c1fb79a..7e17efc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.thread.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -971,6 +972,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param ordered Ordered flag.
* @param timeout Timeout.
* @param skipOnTimeout Whether message can be skipped on timeout.
+ * @param ackClosure Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
private void send(
@@ -981,7 +983,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
byte plc,
boolean ordered,
long timeout,
- boolean skipOnTimeout
+ boolean skipOnTimeout,
+ IgniteInClosure<IgniteException> ackClosure
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
@@ -1001,13 +1004,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
processOrderedMessage(locNodeId, ioMsg, plc, null);
else
processRegularMessage0(ioMsg, locNodeId);
+
+ if (ackClosure != null)
+ ackClosure.apply(null);
}
else {
if (topicOrd < 0)
ioMsg.topicBytes(marsh.marshal(topic));
try {
- getSpi().sendMessage(node, ioMsg);
+ if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
+ ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure);
+ else
+ getSpi().sendMessage(node, ioMsg);
}
catch (IgniteSpiException e) {
throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
@@ -1050,7 +1059,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
}
/**
@@ -1062,7 +1071,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false);
+ send(node, topic, -1, msg, plc, false, 0, false, null);
}
/**
@@ -1074,7 +1083,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
}
/**
@@ -1096,7 +1105,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
}
/**
@@ -1123,11 +1132,24 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
}
/**
- * @param nodes Destination nodes.
+ * @param node Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param ackClosure Ack closure.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
+ IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException {
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure);
+ }
+
+ /**
+ * @param nodes Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
@@ -1150,7 +1172,20 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * @param nodes Destination nodes.
+ * @param node Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param ackClosure Ack closure.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure)
+ throws IgniteCheckedException {
+ send(node, topic, -1, msg, plc, false, 0, false, ackClosure);
+ }
+
+ /**
+ * @param nodes Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
@@ -1182,6 +1217,30 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @param ackClosure Ack closure.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(
+ ClusterNode node,
+ Object topic,
+ Message msg,
+ byte plc,
+ long timeout,
+ boolean skipOnTimeout,
+ IgniteInClosure<IgniteException> ackClosure
+ ) throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+ }
+
+ /**
* Sends a peer deployable user message.
*
* @param nodes Destination nodes.
@@ -1301,6 +1360,35 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * @param nodeId Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @param ackClosure Ack closure.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(
+ UUID nodeId,
+ Object topic,
+ Message msg,
+ byte plc,
+ long timeout,
+ boolean skipOnTimeout,
+ IgniteInClosure<IgniteException> ackClosure
+ ) throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+ }
+
+ /**
* @param nodes Destination nodes.
* @param topic Topic to send the message to.
* @param topicOrd Topic ordinal value.
@@ -1334,7 +1422,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// messages to one node vs. many.
if (!nodes.isEmpty()) {
for (ClusterNode node : nodes)
- send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout);
+ send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
}
else if (log.isDebugEnabled())
log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 693a5a4..1a26ad5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.jetbrains.annotations.*;
@@ -94,10 +95,12 @@ public interface GridCommunicationClient {
/**
* @param nodeId Node ID (provided only if versions of local and remote nodes are different).
* @param msg Message to send.
+ * @param closure Ack closure.
* @throws IgniteCheckedException If failed.
* @return {@code True} if should try to resend message.
*/
- public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
+ public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> closure)
+ throws IgniteCheckedException;
/**
* @return {@code True} if send is asynchronous.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
index 9029dd2..aac238a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
/**
* Future that represents already completed result.
@@ -57,6 +59,16 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G
}
/** {@inheritDoc} */
+ @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInClosure<IgniteException> ackClosure() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNioFinishedFuture.class, this, super.toString());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index 7101f45..5a884f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
/**
* NIO future.
@@ -39,4 +41,16 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
* @return {@code True} if skip recovery for this operation.
*/
public boolean skipRecovery();
+
+ /**
+ * Sets ack closure which will be applied when ack recevied.
+ *
+ * @param closure Ack closure.
+ */
+ public void ackClosure(IgniteInClosure<IgniteException> closure);
+
+ /**
+ * @return Ack closure.
+ */
+ public IgniteInClosure<IgniteException> ackClosure();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
index c5393c4..e71bf92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
/**
* Default future implementation.
@@ -30,6 +32,9 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
/** */
protected boolean msgThread;
+ /** */
+ protected IgniteInClosure<IgniteException> ackClosure;
+
/** {@inheritDoc} */
@Override public void messageThread(boolean msgThread) {
this.msgThread = msgThread;
@@ -46,6 +51,16 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
}
/** {@inheritDoc} */
+ @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
+ ackClosure = closure;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInClosure<IgniteException> ackClosure() {
+ return ackClosure;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNioFutureImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 733ae81..a7ed02a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -182,6 +182,9 @@ public class GridNioRecoveryDescriptor {
assert fut.isDone() : fut;
+ if (fut.ackClosure() != null)
+ fut.ackClosure().apply(null);
+
acked++;
}
}
@@ -358,8 +361,14 @@ public class GridNioRecoveryDescriptor {
* @param futs Futures to complete.
*/
private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
- for (GridNioFuture<?> msg : futs)
- ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id()));
+ for (GridNioFuture<?> msg : futs) {
+ IOException e = new IOException("Failed to send message, node has left: " + node.id());
+
+ ((GridNioFutureImpl)msg).onDone(e);
+
+ if (msg.ackClosure() != null)
+ msg.ackClosure().apply(new IgniteException(e));
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index ed55101..c180837 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -394,6 +394,11 @@ public class GridNioServer<T> {
int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
+ IgniteInClosure<IgniteException> ackClosure;
+
+ if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
+ fut.ackClosure(ackClosure);
+
if (ses.closed()) {
if (ses.removeFuture(fut))
fut.connectionClosed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index 004c327..23c1e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -45,7 +45,10 @@ public enum GridNioSessionMetaKey {
MSG_WRITER,
/** SSL engine. */
- SSL_ENGINE;
+ SSL_ENGINE,
+
+ /** Ack closure. */
+ ACK_CLOSURE;
/** Maximum count of NIO session keys in system. */
public static final int MAX_KEYS_CNT = 64;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index e05c37a..67d4664 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.jetbrains.annotations.*;
@@ -113,7 +114,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
}
/** {@inheritDoc} */
- @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg)
+ @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg,
+ IgniteInClosure<IgniteException> closure)
throws IgniteCheckedException {
if (closed())
throw new IgniteCheckedException("Communication client was closed: " + this);
@@ -131,6 +133,9 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
markUsed();
+ if (closure != null)
+ closure.apply(null);
+
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index abad875..7933001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.jetbrains.annotations.*;
@@ -27,6 +28,8 @@ import java.io.*;
import java.nio.*;
import java.util.*;
+import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
+
/**
* Grid client for NIO server.
*/
@@ -97,11 +100,14 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
}
/** {@inheritDoc} */
- @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg)
+ @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure)
throws IgniteCheckedException {
// Node ID is never provided in asynchronous send mode.
assert nodeId == null;
+ if (closure != null)
+ ses.addMeta(ACK_CLOSURE.ordinal(), closure);
+
GridNioFuture<?> fut = ses.send(msg);
if (fut.isDone()) {
@@ -109,6 +115,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
fut.get();
}
catch (IgniteCheckedException e) {
+ if (closure != null)
+ ses.removeMeta(ACK_CLOSURE.ordinal());
+
if (log.isDebugEnabled())
log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
@@ -119,6 +128,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
}
}
+ if (closure != null)
+ ses.removeMeta(ACK_CLOSURE.ordinal());
+
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1c74d59..b706edf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1620,6 +1620,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* Creates new shared memory communication server.
+ *
* @return Server.
* @throws IgniteCheckedException If failed.
*/
@@ -1785,11 +1786,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ sendMessage0(node, msg, null);
+ }
+
+ /**
+ * Sends given message to destination node. Note that characteristics of the
+ * exchange such as durability, guaranteed delivery or error notification is
+ * dependant on SPI implementation.
+ *
+ * @param node Destination node.
+ * @param msg Message to send.
+ * @param ackClosure Ack closure.
+ * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
+ * Note that this is not guaranteed that failed communication will result
+ * in thrown exception as this is dependant on SPI implementation.
+ */
+ public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ throws IgniteSpiException {
+ sendMessage0(node, msg, ackClosure);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param msg Message to send.
+ * @param ackClosure Ack closure.
+ * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
+ * Note that this is not guaranteed that failed communication will result
+ * in thrown exception as this is dependant on SPI implementation.
+ */
+ private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ throws IgniteSpiException {
assert node != null;
assert msg != null;
if (log.isTraceEnabled())
- log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
+ log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
ClusterNode localNode = getLocalNode();
@@ -1813,7 +1844,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (!client.async() && !localNode.version().equals(node.version()))
nodeId = node.id();
- retry = client.sendMessage(nodeId, msg);
+ retry = client.sendMessage(nodeId, msg, ackClosure);
client.release();
@@ -1876,7 +1907,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridCommunicationClient old = clients.put(nodeId, client0);
assert old == null : "Client already created " +
- "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
+ "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
if (client0 instanceof GridTcpNioCommunicationClient) {
GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
@@ -1979,7 +2010,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException {
+ @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node,
+ Integer port) throws IgniteCheckedException {
int attempt = 1;
int connectAttempts = 1;
@@ -2204,6 +2236,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine);
}
+
if (recoveryDesc != null) {
recoveryDesc.onHandshake(rcvCnt);
@@ -2433,7 +2466,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else if (log.isDebugEnabled())
log.debug("Received remote node ID: " + rmtNodeId0);
- if (isSslEnabled() ) {
+ if (isSslEnabled()) {
assert sslHnd != null;
ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
new file mode 100644
index 0000000..e353f2d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.*;
+import org.apache.ignite.testframework.junits.spi.*;
+
+import org.eclipse.jetty.util.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi>
+ extends GridSpiAbstractTest<T> {
+ /** */
+ private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
+
+ /** */
+ protected static final List<TcpCommunicationSpi> spis = new ArrayList<>();
+
+ /** */
+ protected static final List<ClusterNode> nodes = new ArrayList<>();
+
+ /** */
+ private static final int SPI_CNT = 2;
+
+ /**
+ *
+ */
+ static {
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
+ }
+
+ /**
+ * Disable SPI auto-start.
+ */
+ public IgniteTcpCommunicationRecoveryAckClosureSelfTest() {
+ super(false);
+ }
+
+ /** */
+ @SuppressWarnings({"deprecation"})
+ private class TestListener implements CommunicationListener<Message> {
+ /** */
+ private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
+
+ /** */
+ private AtomicInteger rcvCnt = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
+ info("Test listener received message: " + msg);
+
+ assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
+
+ GridTestMessage msg0 = (GridTestMessage)msg;
+
+ assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId()));
+
+ rcvCnt.incrementAndGet();
+
+ msgC.run();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(UUID nodeId) {
+ // No-op.
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAckOnIdle() throws Exception {
+ checkAck(10, 2000, 9);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAckOnCount() throws Exception {
+ checkAck(10, 60_000, 10);
+ }
+
+ /**
+ * @param ackCnt Recovery acknowledgement count.
+ * @param idleTimeout Idle connection timeout.
+ * @param msgPerIter Messages per iteration.
+ * @throws Exception If failed.
+ */
+ private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception {
+ createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT);
+
+ try {
+ TcpCommunicationSpi spi0 = spis.get(0);
+ TcpCommunicationSpi spi1 = spis.get(1);
+
+ ClusterNode node0 = nodes.get(0);
+ ClusterNode node1 = nodes.get(1);
+
+ int msgId = 0;
+
+ int expMsgs = 0;
+
+ for (int i = 0; i < 5; i++) {
+ info("Iteration: " + i);
+
+ final AtomicInteger ackMsgs = new AtomicInteger(0);
+
+ IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() {
+ @Override public void apply(IgniteException o) {
+ assert o == null;
+
+ ackMsgs.incrementAndGet();
+ }
+ };
+
+ for (int j = 0; j < msgPerIter; j++) {
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+
+ spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure);
+ }
+
+ expMsgs += msgPerIter;
+
+ for (TcpCommunicationSpi spi : spis) {
+ GridNioServer srv = U.field(spi, "nioSrvr");
+
+ Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+ assertFalse(sessions.isEmpty());
+
+ boolean found = false;
+
+ for (GridNioSession ses : sessions) {
+ final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+
+ if (recoveryDesc != null) {
+ found = true;
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return recoveryDesc.messagesFutures().isEmpty();
+ }
+ }, 10_000);
+
+ assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
+ recoveryDesc.messagesFutures().size());
+
+ break;
+ }
+ }
+
+ assertTrue(found);
+ }
+
+ final int expMsgs0 = expMsgs;
+
+ for (TcpCommunicationSpi spi : spis) {
+ final TestListener lsnr = (TestListener)spi.getListener();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ return lsnr.rcvCnt.get() >= expMsgs0;
+ }
+ }, 5000);
+
+ assertEquals(expMsgs, lsnr.rcvCnt.get());
+ }
+
+ assertEquals(msgPerIter * 2, ackMsgs.get());
+ }
+ }
+ finally {
+ stopSpis();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueOverflow() throws Exception {
+ for (int i = 0; i < 3; i++) {
+ try {
+ startSpis(5, 60_000, 10);
+
+ checkOverflow();
+
+ break;
+ }
+ catch (IgniteCheckedException e) {
+ if (e.hasCause(BindException.class)) {
+ if (i < 2) {
+ info("Got exception caused by BindException, will retry after delay: " + e);
+
+ stopSpis();
+
+ U.sleep(10_000);
+ }
+ else
+ throw e;
+ }
+ else
+ throw e;
+ }
+ finally {
+ stopSpis();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkOverflow() throws Exception {
+ TcpCommunicationSpi spi0 = spis.get(0);
+ TcpCommunicationSpi spi1 = spis.get(1);
+
+ ClusterNode node0 = nodes.get(0);
+ ClusterNode node1 = nodes.get(1);
+
+ final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+
+ final AtomicInteger ackMsgs = new AtomicInteger(0);
+
+ IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() {
+ @Override public void apply(IgniteException o) {
+ assert o == null;
+
+ ackMsgs.incrementAndGet();
+ }
+ };
+
+ int msgId = 0;
+
+ // Send message to establish connection.
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+
+ // Prevent node1 from send
+ GridTestUtils.setFieldValue(srv1, "skipWrite", true);
+
+ final GridNioSession ses0 = communicationSession(spi0);
+
+ for (int i = 0; i < 150; i++)
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+
+ // Wait when session is closed because of queue overflow.
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return ses0.closeTime() != 0;
+ }
+ }, 5000);
+
+ assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
+
+ GridTestUtils.setFieldValue(srv1, "skipWrite", false);
+
+ for (int i = 0; i < 100; i++)
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+
+ final int expMsgs = 251;
+
+ final TestListener lsnr = (TestListener)spi1.getListener();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr.rcvCnt.get() >= expMsgs;
+ }
+ }, 5000);
+
+ assertEquals(expMsgs, lsnr.rcvCnt.get());
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return expMsgs == ackMsgs.get();
+ }
+ }, 5000);
+ }
+
+ /**
+ * @param spi SPI.
+ * @return Session.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
+ final GridNioServer srv = U.field(spi, "nioSrvr");
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+ return !sessions.isEmpty();
+ }
+ }, 5000);
+
+ Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+ assertEquals(1, sessions.size());
+
+ return sessions.iterator().next();
+ }
+
+ /**
+ * @param ackCnt Recovery acknowledgement count.
+ * @param idleTimeout Idle connection timeout.
+ * @param queueLimit Message queue limit.
+ * @return SPI instance.
+ */
+ protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) {
+ TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+ spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+ spi.setIdleConnectionTimeout(idleTimeout);
+ spi.setTcpNoDelay(true);
+ spi.setAckSendThreshold(ackCnt);
+ spi.setMessageQueueLimit(queueLimit);
+ spi.setSharedMemoryPort(-1);
+
+ return spi;
+ }
+
+ /**
+ * @param ackCnt Recovery acknowledgement count.
+ * @param idleTimeout Idle connection timeout.
+ * @param queueLimit Message queue limit.
+ * @throws Exception If failed.
+ */
+ private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+ spis.clear();
+ nodes.clear();
+ spiRsrcs.clear();
+
+ Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+
+ for (int i = 0; i < SPI_CNT; i++) {
+ TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
+
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
+ IgniteTestResources rsrcs = new IgniteTestResources();
+
+ GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+
+ GridSpiTestContext ctx = initSpiContext();
+
+ ctx.setLocalNode(node);
+
+ spiRsrcs.add(rsrcs);
+
+ rsrcs.inject(spi);
+
+ spi.setListener(new TestListener());
+
+ node.setAttributes(spi.getNodeAttributes());
+
+ nodes.add(node);
+
+ spi.spiStart(getTestGridName() + (i + 1));
+
+ spis.add(spi);
+
+ spi.onContextInitialized(ctx);
+
+ ctxs.put(node, ctx);
+ }
+
+ // For each context set remote nodes.
+ for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) {
+ for (ClusterNode n : nodes) {
+ if (!n.equals(e.getKey()))
+ e.getValue().remoteNodes().add(n);
+ }
+ }
+ }
+
+ /**
+ * @param ackCnt Recovery acknowledgement count.
+ * @param idleTimeout Idle connection timeout.
+ * @param queueLimit Message queue limit.
+ * @throws Exception If failed.
+ */
+ private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+ for (int i = 0; i < 3; i++) {
+ try {
+ startSpis(ackCnt, idleTimeout, queueLimit);
+
+ break;
+ }
+ catch (IgniteCheckedException e) {
+ if (e.hasCause(BindException.class)) {
+ if (i < 2) {
+ info("Failed to start SPIs because of BindException, will retry after delay.");
+
+ stopSpis();
+
+ U.sleep(10_000);
+ }
+ else
+ throw e;
+ }
+ else
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void stopSpis() throws Exception {
+ for (CommunicationSpi<Message> spi : spis) {
+ spi.onContextDestroyed();
+
+ spi.setListener(null);
+
+ spi.spiStop();
+ }
+
+ for (IgniteTestResources rsrcs : spiRsrcs)
+ rsrcs.stopThreads();
+
+ spis.clear();
+ nodes.clear();
+ spiRsrcs.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c10ade5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 3f71d7d..9b43204 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -32,6 +32,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Communication SPI Test Suite");
suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class));
+ suite.addTest(new TestSuite(IgniteTcpCommunicationRecoveryAckClosureSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));