You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/31 09:01:46 UTC
incubator-ignite git commit: IGNITE-1169 Fixed review notes.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1169 e7bd078b4 -> 381773897
IGNITE-1169 Fixed review notes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/38177389
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/38177389
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/38177389
Branch: refs/heads/ignite-1169
Commit: 38177389733707cd4e53026b08165b1772e66d97
Parents: e7bd078
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 31 10:01:34 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 31 10:01:34 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 27 ++++++++++----------
.../util/nio/GridCommunicationClient.java | 2 +-
.../util/nio/GridNioFinishedFuture.java | 5 ++--
.../ignite/internal/util/nio/GridNioFuture.java | 5 ++--
.../internal/util/nio/GridNioFutureImpl.java | 7 ++---
.../util/nio/GridNioRecoveryDescriptor.java | 2 +-
.../ignite/internal/util/nio/GridNioServer.java | 2 +-
.../util/nio/GridShmemCommunicationClient.java | 5 +++-
.../util/nio/GridTcpNioCommunicationClient.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 8 +++---
...mmunicationSpiRecoveryAckFutureSelfTest.java | 19 +++++++-------
11 files changed, 44 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 272950e..33e8c1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -987,7 +987,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
boolean ordered,
long timeout,
boolean skipOnTimeout,
- IgniteInClosure<Exception> ackClosure
+ IgniteInClosure<IgniteException> ackClosure
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
@@ -1017,7 +1017,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
try {
if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
- ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessageWithAck(node, ioMsg, ackClosure);
+ ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure);
else
getSpi().sendMessage(node, ioMsg);
}
@@ -1161,11 +1161,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
+ * @param ackClosure Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendWithAck(ClusterNode node, Object topic, Message msg, byte plc)
+ public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false, null);
+ send(node, topic, -1, msg, plc, false, 0, false, ackClosure);
}
/**
@@ -1188,8 +1189,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param ackClosure Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendWithAck(ClusterNode node, GridTopic topic, Message msg, byte plc,
- IgniteInClosure<Exception> ackClosure) throws IgniteCheckedException {
+ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
+ IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException {
send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure);
}
@@ -1225,14 +1226,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param ackClosure Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendOrderedMessageWithAck(
+ public void sendOrderedMessage(
ClusterNode node,
Object topic,
Message msg,
byte plc,
long timeout,
boolean skipOnTimeout,
- IgniteInClosure<Exception> ackClosure
+ IgniteInClosure<IgniteException> ackClosure
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
@@ -1246,7 +1247,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @param timeout Timeout to keep a message on receiving queue.
* @param skipOnTimeout Whether message can be skipped on timeout.
- * @param ackClosure Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void sendOrderedMessage(
@@ -1255,8 +1255,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Message msg,
byte plc,
long timeout,
- boolean skipOnTimeout,
- IgniteInClosure<Exception> ackClosure
+ boolean skipOnTimeout
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
@@ -1265,7 +1264,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
}
/**
@@ -1278,14 +1277,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param ackClosure Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendOrderedMessageWithAck(
+ public void sendOrderedMessage(
UUID nodeId,
Object topic,
Message msg,
byte plc,
long timeout,
boolean skipOnTimeout,
- IgniteInClosure<Exception> ackClosure
+ IgniteInClosure<IgniteException> ackClosure
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 336aab9..1a26ad5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -99,7 +99,7 @@ public interface GridCommunicationClient {
* @throws IgniteCheckedException If failed.
* @return {@code True} if should try to resend message.
*/
- public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<Exception> closure)
+ public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> closure)
throws IgniteCheckedException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
index 21cf17c..aac238a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -58,12 +59,12 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G
}
/** {@inheritDoc} */
- @Override public void ackClosure(IgniteInClosure<Exception> closure) {
+ @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
// No-op.
}
/** {@inheritDoc} */
- @Override public IgniteInClosure<Exception> ackClosure() {
+ @Override public IgniteInClosure<IgniteException> ackClosure() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index 2b77089..5a884f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.lang.*;
@@ -46,10 +47,10 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
*
* @param closure Ack closure.
*/
- public void ackClosure(IgniteInClosure<Exception> closure);
+ public void ackClosure(IgniteInClosure<IgniteException> closure);
/**
* @return Ack closure.
*/
- public IgniteInClosure<Exception> ackClosure();
+ public IgniteInClosure<IgniteException> ackClosure();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
index 847b7d6..e71bf92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -32,7 +33,7 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
protected boolean msgThread;
/** */
- protected IgniteInClosure<Exception> ackClosure;
+ protected IgniteInClosure<IgniteException> ackClosure;
/** {@inheritDoc} */
@Override public void messageThread(boolean msgThread) {
@@ -50,12 +51,12 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
}
/** {@inheritDoc} */
- @Override public void ackClosure(IgniteInClosure<Exception> closure) {
+ @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
ackClosure = closure;
}
/** {@inheritDoc} */
- @Override public IgniteInClosure<Exception> ackClosure() {
+ @Override public IgniteInClosure<IgniteException> ackClosure() {
return ackClosure;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index e528361..a21600b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -368,7 +368,7 @@ public class GridNioRecoveryDescriptor {
((GridNioFutureImpl)msg).onDone(e);
if (msg.ackClosure() != null)
- msg.ackClosure().apply(e);
+ msg.ackClosure().apply(new IgniteException(e));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index f4a27fa..5c4916e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -392,7 +392,7 @@ public class GridNioServer<T> {
int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
- IgniteInClosure<Exception> ackClosure;
+ IgniteInClosure<IgniteException> ackClosure;
if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
fut.ackClosure(ackClosure);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 9cf87c6..67d4664 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -115,7 +115,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
/** {@inheritDoc} */
@Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg,
- IgniteInClosure<Exception> closure)
+ IgniteInClosure<IgniteException> closure)
throws IgniteCheckedException {
if (closed())
throw new IgniteCheckedException("Communication client was closed: " + this);
@@ -133,6 +133,9 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
markUsed();
+ if (closure != null)
+ closure.apply(null);
+
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 4122e48..7933001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -100,7 +100,7 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
}
/** {@inheritDoc} */
- @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<Exception> closure)
+ @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure)
throws IgniteCheckedException {
// Node ID is never provided in asynchronous send mode.
assert nodeId == null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b055eff..4cb59fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1697,7 +1697,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
- sendMessage(node, msg, null);
+ sendMessage0(node, msg, null);
}
/**
@@ -1712,9 +1712,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Note that this is not guaranteed that failed communication will result
* in thrown exception as this is dependant on SPI implementation.
*/
- public void sendMessageWithAck(ClusterNode node, Message msg, IgniteInClosure<Exception> ackClosure)
+ public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
throws IgniteSpiException {
- sendMessage(node, msg, ackClosure);
+ sendMessage0(node, msg, ackClosure);
}
/**
@@ -1725,7 +1725,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Note that this is not guaranteed that failed communication will result
* in thrown exception as this is dependant on SPI implementation.
*/
- private void sendMessage(ClusterNode node, Message msg, IgniteInClosure<Exception> ackClosure)
+ private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
throws IgniteSpiException {
assert node != null;
assert msg != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38177389/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
index 3f788ba..c082b4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.spi.communication.tcp;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.nio.*;
@@ -143,8 +142,8 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
final AtomicInteger ackMsgs = new AtomicInteger(0);
- IgniteInClosure<Exception> ackClosure = new CI1<Exception>() {
- @Override public void apply(Exception o) {
+ IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() {
+ @Override public void apply(IgniteException o) {
assert o == null;
ackMsgs.incrementAndGet();
@@ -152,9 +151,9 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
};
for (int j = 0; j < msgPerIter; j++) {
- spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
- spi1.sendMessageWithAck(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure);
+ spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure);
}
expMsgs += msgPerIter;
@@ -260,8 +259,8 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
final AtomicInteger ackMsgs = new AtomicInteger(0);
- IgniteInClosure<Exception> ackClosure = new CI1<Exception>() {
- @Override public void apply(Exception o) {
+ IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() {
+ @Override public void apply(IgniteException o) {
assert o == null;
ackMsgs.incrementAndGet();
@@ -271,7 +270,7 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
int msgId = 0;
// Send message to establish connection.
- spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
// Prevent node1 from send
GridTestUtils.setFieldValue(srv1, "skipWrite", true);
@@ -279,7 +278,7 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
final GridNioSession ses0 = communicationSession(spi0);
for (int i = 0; i < 150; i++)
- spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
// Wait when session is closed because of queue overflow.
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -293,7 +292,7 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
GridTestUtils.setFieldValue(srv1, "skipWrite", false);
for (int i = 0; i < 100; i++)
- spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+ spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
final int expMsgs = 251;