You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 15:09:36 UTC
[2/8] ignite git commit: ignite-4705 Atomic cache protocol change:
notify client node from backups
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index d108b56..7af6139 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -24,6 +24,7 @@ import java.nio.ByteOrder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
/**
@@ -201,7 +203,10 @@ public class IpcToNioAdapter<T> {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+ Object msg,
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC) {
assert ses == IpcToNioAdapter.this.ses;
return send((Message)msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
index 7987d3d..f110cf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.util.nio;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
/**
* Verifies that first bytes received in accepted (incoming)
@@ -73,9 +75,10 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
@Override public GridNioFuture<?> onSessionWrite(
GridNioSession ses,
Object msg,
- boolean fut
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
- return proceedSessionWrite(ses, msg, fut);
+ return proceedSessionWrite(ses, msg, fut, ackC);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
index 5d90cdb..d55bc54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.util.nio;
import java.util.concurrent.Executor;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerPool;
+import org.apache.ignite.lang.IgniteInClosure;
/**
* Enables multithreaded notification of session opened, message received and session closed events.
@@ -110,9 +112,10 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
@Override public GridNioFuture<?> onSessionWrite(
GridNioSession ses,
Object msg,
- boolean fut
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
- return proceedSessionWrite(ses, msg, fut);
+ return proceedSessionWrite(ses, msg, fut, ackC);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
index 343e625..b81086a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.util.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
/**
* Filter that transforms byte buffers to user-defined objects and vice-versa
@@ -82,16 +84,17 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
@Override public GridNioFuture<?> onSessionWrite(
GridNioSession ses,
Object msg,
- boolean fut
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
// No encoding needed in direct mode.
if (directMode)
- return proceedSessionWrite(ses, msg, fut);
+ return proceedSessionWrite(ses, msg, fut, ackC);
try {
ByteBuffer res = parser.encode(ses, msg);
- return proceedSessionWrite(ses, res, fut);
+ return proceedSessionWrite(ses, res, fut, ackC);
}
catch (IOException e) {
throw new GridNioException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
index be77d39..eab4909 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
@@ -31,6 +31,13 @@ public class GridNioEmbeddedFuture<R> extends GridNioFutureImpl<R> {
private static final long serialVersionUID = 0L;
/**
+ *
+ */
+ public GridNioEmbeddedFuture() {
+ super(null);
+ }
+
+ /**
* Callback to notify that future is finished.
* This method must delegate to {@link #onDone(GridNioFuture, Throwable)} method.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
index f7928c4..9163a4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
/**
* This interface defines the general element in transformation chain between the nio server and
@@ -106,13 +108,15 @@ public interface GridNioFilter {
* @param ses Session instance.
* @param msg Message to send.
* @param fut {@code True} if write future should be created.
+ * @param ackC Closure invoked when message ACK is received.
* @return Write future or {@code null}.
* @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
*/
public GridNioFuture<?> proceedSessionWrite(
GridNioSession ses,
Object msg,
- boolean fut
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException;
/**
@@ -155,10 +159,14 @@ public interface GridNioFilter {
* @param ses Session on which message should be written.
* @param msg Message being written.
* @param fut {@code True} if write future should be created.
+ * @param ackC Closure invoked when message ACK is received.
* @return Write future or {@code null}.
* @throws GridNioException If GridNioException occurred while handling event.
*/
- public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException;
+ public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+ Object msg,
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException;
/**
* Invoked when a new messages received.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
index 58ddae5..4ede4b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
/**
* Class that defines the piece for application-to-network and vice-versa data conversions
@@ -111,11 +113,12 @@ public abstract class GridNioFilterAdapter implements GridNioFilter {
@Override public GridNioFuture<?> proceedSessionWrite(
GridNioSession ses,
Object msg,
- boolean fut
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
checkNext();
- return nextFilter.onSessionWrite(ses, msg, fut);
+ return nextFilter.onSessionWrite(ses, msg, fut, ackC);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
index 8cc690b..ec59020 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.lang.IgniteInClosure;
/**
* Filter chain implementation for nio server filters.
@@ -184,9 +186,10 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
@Override public GridNioFuture<?> onSessionWrite(
GridNioSession ses,
Object msg,
- boolean fut
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
- return tail.onSessionWrite(ses, msg, fut);
+ return tail.onSessionWrite(ses, msg, fut, ackC);
}
/**
@@ -259,9 +262,11 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut)
- throws IgniteCheckedException {
- return proceedSessionWrite(ses, msg, fut);
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+ Object msg,
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+ return proceedSessionWrite(ses, msg, fut, ackC);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
index 3d18ab7..2835a22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
@@ -59,11 +59,6 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G
}
/** {@inheritDoc} */
- @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index 6c0c9c6..4d1fee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -43,13 +43,6 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
public boolean skipRecovery();
/**
- * Sets ack closure which will be applied when ack received.
- *
- * @param c Ack closure.
- */
- public void ackClosure(IgniteInClosure<IgniteException> c);
-
- /**
* The method will be called when ack received.
*/
public void onAckReceived();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
index fe97039..6a94a54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
@@ -30,10 +30,17 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
private static final long serialVersionUID = 0L;
/** */
- protected boolean msgThread;
+ private boolean msgThread;
/** */
- protected IgniteInClosure<IgniteException> ackClosure;
+ protected final IgniteInClosure<IgniteException> ackC;
+
+ /**
+ * @param ackC Ack closure.
+ */
+ public GridNioFutureImpl(IgniteInClosure<IgniteException> ackC) {
+ this.ackC = ackC;
+ }
/** {@inheritDoc} */
@Override public void messageThread(boolean msgThread) {
@@ -51,18 +58,13 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
}
/** {@inheritDoc} */
- @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
- ackClosure = closure;
- }
-
- /** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
}
/** {@inheritDoc} */
@Override public IgniteInClosure<IgniteException> ackClosure() {
- return ackClosure;
+ return ackC;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index fefdf15..7f25e40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -74,7 +74,6 @@ import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import sun.nio.ch.DirectBuffer;
-import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPERATION;
@@ -481,22 +480,26 @@ public class GridNioServer<T> {
* @param ses Session.
* @param msg Message.
* @param createFut {@code True} if future should be created.
+ * @param ackC Closure invoked when message ACK is received.
* @return Future for operation.
*/
- GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException {
+ GridNioFuture<?> send(GridNioSession ses,
+ ByteBuffer msg,
+ boolean createFut,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert ses instanceof GridSelectorNioSessionImpl : ses;
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
if (createFut) {
- NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+ NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, ackC);
send0(impl, fut, false);
return fut;
}
else {
- SessionWriteRequest req = new WriteRequestImpl(ses, msg, true);
+ SessionWriteRequest req = new WriteRequestImpl(ses, msg, true, ackC);
send0(impl, req, false);
@@ -508,23 +511,27 @@ public class GridNioServer<T> {
* @param ses Session.
* @param msg Message.
* @param createFut {@code True} if future should be created.
+ * @param ackC Closure invoked when message ACK is received.
* @return Future for operation.
*/
- GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException {
+ GridNioFuture<?> send(GridNioSession ses,
+ Message msg,
+ boolean createFut,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert ses instanceof GridSelectorNioSessionImpl;
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
if (createFut) {
NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
- skipRecoveryPred.apply(msg));
+ skipRecoveryPred.apply(msg), ackC);
send0(impl, fut, false);
return fut;
}
else {
- SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg));
+ SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC);
send0(impl, req, false);
@@ -544,11 +551,6 @@ public class GridNioServer<T> {
int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req);
- IgniteInClosure<IgniteException> ackC;
-
- if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
- req.ackClosure(ackC);
-
if (ses.closed()) {
if (ses.removeFuture(req)) {
IOException err = new IOException("Failed to send message (connection was closed): " + ses);
@@ -597,8 +599,11 @@ public class GridNioServer<T> {
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
if (lsnr != null) {
- NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
- skipRecoveryPred.apply(msg));
+ NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl,
+ NioOperation.REQUIRE_WRITE,
+ msg,
+ skipRecoveryPred.apply(msg),
+ null);
fut.listen(lsnr);
@@ -2597,11 +2602,6 @@ public class GridNioServer<T> {
}
/** {@inheritDoc} */
- @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
@Override public void onAckReceived() {
throw new UnsupportedOperationException();
}
@@ -2664,17 +2664,22 @@ public class GridNioServer<T> {
private final boolean skipRecovery;
/** */
- private IgniteInClosure<IgniteException> ackC;
+ private final IgniteInClosure<IgniteException> ackC;
/**
* @param ses Session.
* @param msg Message.
* @param skipRecovery Skip recovery flag.
+ * @param ackC Closure invoked when message ACK is received.
*/
- WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) {
+ WriteRequestImpl(GridNioSession ses,
+ Object msg,
+ boolean skipRecovery,
+ IgniteInClosure<IgniteException> ackC) {
this.ses = ses;
this.msg = msg;
this.skipRecovery = skipRecovery;
+ this.ackC = ackC;
}
/** {@inheritDoc} */
@@ -2693,11 +2698,6 @@ public class GridNioServer<T> {
}
/** {@inheritDoc} */
- @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
- ackC = c;
- }
-
- /** {@inheritDoc} */
@Override public void onAckReceived() {
assert msg instanceof Message;
@@ -2798,6 +2798,8 @@ public class GridNioServer<T> {
boolean accepted,
@Nullable Map<Integer, ?> meta
) {
+ super(null);
+
op = NioOperation.REGISTER;
this.sockCh = sockCh;
@@ -2812,6 +2814,8 @@ public class GridNioServer<T> {
* @param op Requested operation.
*/
NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op) {
+ super(null);
+
assert ses != null || op == NioOperation.DUMP_STATS : "Invalid params [ses=" + ses + ", op=" + op + ']';
assert op != null;
assert op != NioOperation.REGISTER;
@@ -2826,8 +2830,14 @@ public class GridNioServer<T> {
* @param ses Session to change.
* @param op Requested operation.
* @param msg Message.
+ * @param ackC Closure invoked when message ACK is received.
*/
- NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) {
+ NioOperationFuture(GridSelectorNioSessionImpl ses,
+ NioOperation op,
+ Object msg,
+ IgniteInClosure<IgniteException> ackC) {
+ super(ackC);
+
assert ses != null;
assert op != null;
assert op != NioOperation.REGISTER;
@@ -2845,9 +2855,15 @@ public class GridNioServer<T> {
* @param op Requested operation.
* @param commMsg Direct message.
* @param skipRecovery Skip recovery flag.
+ * @param ackC Closure invoked when message ACK is received.
*/
- NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
- Message commMsg, boolean skipRecovery) {
+ NioOperationFuture(GridSelectorNioSessionImpl ses,
+ NioOperation op,
+ Message commMsg,
+ boolean skipRecovery,
+ IgniteInClosure<IgniteException> ackC) {
+ super(ackC);
+
assert ses != null;
assert op != null;
assert op != NioOperation.REGISTER;
@@ -3013,7 +3029,10 @@ public class GridNioServer<T> {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+ Object msg,
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
if (directMode) {
boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
@@ -3032,10 +3051,10 @@ public class GridNioServer<T> {
return null;
}
else
- return send(ses, (Message)msg, fut);
+ return send(ses, (Message)msg, fut, ackC);
}
else
- return send(ses, (ByteBuffer)msg, fut);
+ return send(ses, (ByteBuffer)msg, fut, ackC);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index c1b60ab..21eabf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.util.nio;
import java.net.InetSocketAddress;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
/**
@@ -107,8 +109,11 @@ public interface GridNioSession {
/**
* @param msg Message to be sent.
+ * @param ackC Optional closure invoked when ack for message is received.
+ * @throws IgniteCheckedException If failed.
*/
- public void sendNoFuture(Object msg) throws IgniteCheckedException;
+ public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC)
+ throws IgniteCheckedException;
/**
* Gets metadata associated with specified key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 7424531..98a22d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.util.nio;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MAX_KEYS_CNT;
@@ -105,7 +107,7 @@ public class GridNioSessionImpl implements GridNioSession {
try {
resetSendScheduleTime();
- return chain().onSessionWrite(this, msg, true);
+ return chain().onSessionWrite(this, msg, true, null);
}
catch (IgniteCheckedException e) {
close();
@@ -115,9 +117,10 @@ public class GridNioSessionImpl implements GridNioSession {
}
/** {@inheritDoc} */
- @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+ @Override public void sendNoFuture(Object msg, IgniteInClosure<IgniteException> ackC)
+ throws IgniteCheckedException {
try {
- chain().onSessionWrite(this, msg, false);
+ chain().onSessionWrite(this, msg, false, ackC);
}
catch (IgniteCheckedException e) {
close();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index bdb3a29..5385430 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -42,10 +42,7 @@ public enum GridNioSessionMetaKey {
MARSHALLER_ID,
/** Message writer. */
- MSG_WRITER,
-
- /** Ack closure. */
- ACK_CLOSURE;
+ MSG_WRITER;
/** Maximum count of NIO session keys in system. */
public static final int MAX_KEYS_CNT = 64;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index d941bae..ab9b2eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -123,7 +123,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
/** {@inheritDoc} */
@Override public synchronized boolean sendMessage(UUID nodeId, Message msg,
- IgniteInClosure<IgniteException> closure) throws IgniteCheckedException {
+ IgniteInClosure<IgniteException> c) throws IgniteCheckedException {
assert nodeId != null;
if (closed())
@@ -142,8 +142,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
markUsed();
- if (closure != null)
- closure.apply(null);
+ if (c != null)
+ c.apply(null);
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 3397772..eff893f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -32,8 +32,6 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE;
-
/**
* Grid client for NIO server.
*/
@@ -115,18 +113,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
// Node ID is never provided in asynchronous send mode.
assert nodeId == null;
- if (c != null)
- ses.addMeta(ACK_CLOSURE.ordinal(), c);
-
- ses.sendNoFuture(msg);
-
- if (c != null)
- ses.removeMeta(ACK_CLOSURE.ordinal());
+ ses.sendNoFuture(msg, c);
}
catch (IgniteCheckedException e) {
- if (c != null)
- ses.removeMeta(ACK_CLOSURE.ordinal());
-
if (log.isDebugEnabled())
log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
index 508c791..e24f3ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
@@ -42,13 +42,6 @@ public interface SessionWriteRequest {
public boolean skipRecovery();
/**
- * Sets ack closure which will be applied when ack received.
- *
- * @param c Ack closure.
- */
- public void ackClosure(IgniteInClosure<IgniteException> c);
-
- /**
* The method will be called when ack received.
*/
public void onAckReceived();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index 8ed7db0..b4bd34a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
@@ -285,10 +286,11 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
@Override public GridNioFuture<?> onSessionWrite(
GridNioSession ses,
Object msg,
- boolean fut
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
if (directMode)
- return proceedSessionWrite(ses, msg, fut);
+ return proceedSessionWrite(ses, msg, fut, ackC);
ByteBuffer input = checkMessage(ses, msg);
@@ -307,13 +309,13 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
if (hnd.isHandshakeFinished()) {
hnd.encrypt(input);
- return hnd.writeNetBuffer();
+ return hnd.writeNetBuffer(ackC);
}
else {
if (log.isDebugEnabled())
log.debug("Write request received during handshake, scheduling deferred write: " + ses);
- return hnd.deferredWrite(input);
+ return hnd.deferredWrite(input, ackC);
}
}
catch (SSLException e) {
@@ -390,7 +392,7 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
try {
hnd.closeOutbound();
- hnd.writeNetBuffer();
+ hnd.writeNetBuffer(null);
}
catch (SSLException e) {
U.warn(log, "Failed to shutdown SSL session gracefully (will force close) [ex=" + e + ", ses=" + ses + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index 269e8b9..e268716 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -27,6 +27,7 @@ import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.nio.GridNioEmbeddedFuture;
import org.apache.ignite.internal.util.nio.GridNioException;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
@@ -274,7 +276,7 @@ class GridNioSslHandler extends ReentrantLock {
log.debug("Wrapped handshake data [status=" + res.getStatus() + ", handshakeStatus=" +
handshakeStatus + ", ses=" + ses + ']');
- writeNetBuffer();
+ writeNetBuffer(null);
break;
}
@@ -412,16 +414,17 @@ class GridNioSslHandler extends ReentrantLock {
* Adds write request to the queue.
*
* @param buf Buffer to write.
+ * @param ackC Closure invoked when message ACK is received.
* @return Write future.
*/
- GridNioFuture<?> deferredWrite(ByteBuffer buf) {
+ GridNioFuture<?> deferredWrite(ByteBuffer buf, IgniteInClosure<IgniteException> ackC) {
assert isHeldByCurrentThread();
GridNioEmbeddedFuture<Object> fut = new GridNioEmbeddedFuture<>();
ByteBuffer cp = copy(buf);
- deferredWriteQueue.offer(new WriteRequest(fut, cp));
+ deferredWriteQueue.offer(new WriteRequest(fut, cp, ackC));
return fut;
}
@@ -437,7 +440,7 @@ class GridNioSslHandler extends ReentrantLock {
while (!deferredWriteQueue.isEmpty()) {
WriteRequest req = deferredWriteQueue.poll();
- req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true));
+ req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true, req.ackC));
}
}
@@ -475,14 +478,15 @@ class GridNioSslHandler extends ReentrantLock {
* Copies data from out net buffer and passes it to the underlying chain.
*
* @return Write future.
+ * @param ackC Closure invoked when message ACK is received.
* @throws GridNioException If send failed.
*/
- GridNioFuture<?> writeNetBuffer() throws IgniteCheckedException {
+ GridNioFuture<?> writeNetBuffer(IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert isHeldByCurrentThread();
ByteBuffer cp = copy(outNetBuf);
- return parent.proceedSessionWrite(ses, cp, true);
+ return parent.proceedSessionWrite(ses, cp, true, ackC);
}
/**
@@ -670,20 +674,27 @@ class GridNioSslHandler extends ReentrantLock {
*/
private static class WriteRequest {
/** Future that should be completed. */
- private GridNioEmbeddedFuture<Object> fut;
+ private final GridNioEmbeddedFuture<Object> fut;
/** Buffer needed to be written. */
- private ByteBuffer buf;
+ private final ByteBuffer buf;
+
+ /** */
+ private final IgniteInClosure<IgniteException> ackC;
/**
* Creates write request.
*
* @param fut Future.
* @param buf Buffer to write.
+ * @param ackC Closure invoked when message ACK is received.
*/
- private WriteRequest(GridNioEmbeddedFuture<Object> fut, ByteBuffer buf) {
+ private WriteRequest(GridNioEmbeddedFuture<Object> fut,
+ ByteBuffer buf,
+ IgniteInClosure<IgniteException> ackC) {
this.fut = fut;
this.buf = buf;
+ this.ackC = ackC;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 35568c3..fe915e5 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -369,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug("Sending local node ID to newly accepted session: " + ses);
try {
- ses.sendNoFuture(nodeIdMessage());
+ ses.sendNoFuture(nodeIdMessage(), null);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message: " + e, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 8d88876..6005ac9 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -46,19 +46,19 @@ public class IgniteThread extends Thread {
/** The name of the Ignite instance this thread belongs to. */
protected final String igniteInstanceName;
- /** Group index. */
- private final int grpIdx;
-
/** */
private int compositeRwLockIdx;
+ /** */
+ private final int stripe;
+
/**
* Creates thread with given worker.
*
* @param worker Runnable to create thread with.
*/
public IgniteThread(GridWorker worker) {
- this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED);
+ this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1);
}
/**
@@ -69,7 +69,7 @@ public class IgniteThread extends Thread {
* @param r Runnable to execute.
*/
public IgniteThread(String igniteInstanceName, String threadName, Runnable r) {
- this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED);
+ this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1);
}
/**
@@ -79,9 +79,10 @@ public class IgniteThread extends Thread {
* @param threadName Name of thread.
* @param r Runnable to execute.
* @param grpIdx Index within a group.
+ * @param stripe Non-negative stripe number if this thread is striped pool thread.
*/
- public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx) {
- this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx);
+ public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) {
+ this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx, stripe);
}
/**
@@ -93,14 +94,16 @@ public class IgniteThread extends Thread {
* @param threadName Name of thread.
* @param r Runnable to execute.
* @param grpIdx Thread index within a group.
+ * @param stripe Non-negative stripe number if this thread is striped pool thread.
*/
- public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx) {
+ public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) {
super(grp, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName));
A.ensure(grpIdx >= -1, "grpIdx >= -1");
this.igniteInstanceName = igniteInstanceName;
- this.grpIdx = compositeRwLockIdx = grpIdx;
+ this.compositeRwLockIdx = grpIdx;
+ this.stripe = stripe;
}
/**
@@ -112,18 +115,15 @@ public class IgniteThread extends Thread {
super(threadGrp, threadName);
this.igniteInstanceName = igniteInstanceName;
- this.grpIdx = compositeRwLockIdx = GRP_IDX_UNASSIGNED;
+ this.compositeRwLockIdx = GRP_IDX_UNASSIGNED;
+ this.stripe = -1;
}
/**
- * Gets name of the grid this thread belongs to.
- *
- * @return Name of the grid this thread belongs to.
- * @deprecated use {@link #getIgniteInstanceName()}
+ * @return Non-negative stripe number if this thread is striped pool thread.
*/
- @Deprecated
- public String getGridName() {
- return getIgniteInstanceName();
+ public int stripe() {
+ return stripe;
}
/**
@@ -136,13 +136,6 @@ public class IgniteThread extends Thread {
}
/**
- * @return Group index.
- */
- public int groupIndex() {
- return grpIdx;
- }
-
- /**
* @return Composite RW lock index.
*/
public int compositeRwLockIndex() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 119ef70..d2f0b15 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -61,7 +61,7 @@ public class IgniteThreadFactory implements ThreadFactory {
/** {@inheritDoc} */
@Override public Thread newThread(@NotNull Runnable r) {
- return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet());
+ return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index da2923f..8cbb596 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -47,7 +47,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
private static final int SAMPLE_CNT = 1;
/** */
- private static final byte DIRECT_TYPE = (byte)210;
+ private static final byte DIRECT_TYPE = -127;
/** */
private int bufSize;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
index 7c0e485..a158f7a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
@@ -40,6 +40,13 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
/**
* @throws Exception If failed.
*/
@@ -63,13 +70,12 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
}
/**
- * @param future Future.
+ * @param fut Future.
* @return Internal future.
*/
- private static IgniteInternalFuture internalFuture(IgniteFuture future) {
- assert future instanceof IgniteFutureImpl;
+ private static IgniteInternalFuture internalFuture(IgniteFuture fut) {
+ assert fut instanceof IgniteFutureImpl : fut;
- return ((IgniteFutureImpl)future).internalFuture();
+ return ((IgniteFutureImpl) fut).internalFuture();
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 52c0ac5..09a0d9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -78,8 +78,6 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
cfg.setNetworkTimeout(60_000);
- cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
-
TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
discoSpi.setSocketTimeout(30_000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index 76cf78c..a12b6b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -94,13 +94,6 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
/**
* @throws Exception If failed.
*/
- public void testPartitionedClock() throws Exception {
- checkMessages(false, CLOCK);
- }
-
- /**
- * @throws Exception If failed.
- */
public void testPartitionedPrimary() throws Exception {
checkMessages(false, PRIMARY);
}
@@ -108,13 +101,6 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
/**
* @throws Exception If failed.
*/
- public void testClientClock() throws Exception {
- checkMessages(true, CLOCK);
- }
-
- /**
- * @throws Exception If failed.
- */
public void testClientPrimary() throws Exception {
checkMessages(true, PRIMARY);
}
@@ -206,14 +192,14 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
private Map<Class<?>, AtomicInteger> cntMap = new HashMap<>();
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
AtomicInteger cntr = cntMap.get(((GridIoMessage)msg).message().getClass());
if (cntr != null)
cntr.incrementAndGet();
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
/**
@@ -221,7 +207,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
*
* @param cls Class to count.
*/
- public void registerMessage(Class<?> cls) {
+ void registerMessage(Class<?> cls) {
AtomicInteger cntr = cntMap.get(cls);
if (cntr == null)
@@ -232,7 +218,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
* @param cls Message type to get count.
* @return Number of messages of given class.
*/
- public int messageCount(Class<?> cls) {
+ int messageCount(Class<?> cls) {
AtomicInteger cntr = cntMap.get(cls);
return cntr == null ? 0 : cntr.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index ba37974..dd27d72 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 728bf13..a44e49e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -401,9 +401,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
assertEquals(3, msgs.size());
- for (Object msg : msgs)
- assertTrue(((GridNearAtomicFullUpdateRequest)msg).clientRequest());
-
map.put(primaryKey(ignite0.cache(null)), 3);
map.put(primaryKey(ignite1.cache(null)), 4);
map.put(primaryKey(ignite2.cache(null)), 5);
@@ -1693,8 +1690,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
* @throws Exception If failed.
*/
public void testAtomicPrimaryPutAllMultinode() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1685");
-
multinode(PRIMARY, TestType.PUT_ALL);
}
@@ -1702,8 +1697,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
* @throws Exception If failed.
*/
public void testAtomicClockPutAllMultinode() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1685");
-
multinode(CLOCK, TestType.PUT_ALL);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index cb1f6fb4..1d2cd2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -103,7 +103,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
for (int i = 0; i < GRID_CNT; i++) {
final IgniteKernal grid = (IgniteKernal)grid(i);
- GridTestUtils.retryAssert(log, 10, 100, new CA() {
+ GridTestUtils.retryAssert(log, 10, 500, new CA() {
@Override public void apply() {
assertTrue(grid.internalCache().context().mvcc().atomicFutures().isEmpty());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
index 5050300..2600e7b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
@@ -39,9 +39,11 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -702,8 +704,12 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@SuppressWarnings("ConstantConditions")
- private void checkEntry(Ignite ignite, Integer key, @Nullable Integer val, boolean expectNear, UUID... expReaders)
- throws Exception {
+ private void checkEntry(Ignite ignite,
+ Integer key,
+ @Nullable Integer val,
+ boolean expectNear,
+ final UUID... expReaders) throws Exception
+ {
GridCacheAdapter<Integer, Integer> near = ((IgniteKernal) ignite).internalCache();
assertTrue(near.isNear());
@@ -728,11 +734,22 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
GridDhtCacheAdapter<Integer, Integer> dht = ((GridNearCacheAdapter<Integer, Integer>)near).dht();
- GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(key);
+ final GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(key);
if (expectDht) {
assertNotNull("No dht entry for: " + key + ", grid: " + ignite.name(), dhtEntry);
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ return dhtEntry.readers().size() == expReaders.length;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 5000);
+
Collection<UUID> readers = dhtEntry.readers();
assertEquals(expReaders.length, readers.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 3942e35..2971f81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -43,10 +43,10 @@ import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -152,7 +152,12 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
@Override protected void afterTest() throws Exception {
super.afterTest();
- ignite(0).destroyCache(null);
+ try {
+ checkInternalCleanup();
+ }
+ finally {
+ ignite(0).destroyCache(null);
+ }
}
/**
@@ -456,22 +461,40 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
for (int i = 0; i < keysCnt; i++)
assertEquals((Integer)iter, cache.get(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkInternalCleanup() throws Exception{
+ checkNoAtomicFutures();
+
+ checkOnePhaseCommitReturnValuesCleaned();
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ void checkNoAtomicFutures() throws Exception {
for (int i = 0; i < GRID_CNT; i++) {
- IgniteKernal ignite = (IgniteKernal)grid(i);
+ final IgniteKernal ignite = (IgniteKernal)grid(i);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return ignite.context().cache().context().mvcc().atomicFuturesCount() == 0;
+ }
+ }, 5_000);
Collection<?> futs = ignite.context().cache().context().mvcc().atomicFutures();
assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty());
}
-
- checkOnePhaseCommitReturnValuesCleaned();
}
/**
- *
+ * @throws Exception If failed.
*/
- protected void checkOnePhaseCommitReturnValuesCleaned() throws IgniteInterruptedCheckedException {
+ void checkOnePhaseCommitReturnValuesCleaned() throws Exception {
U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT);
for (int i = 0; i < GRID_CNT; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index cc5f548..7460828 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -208,7 +208,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
}
/**
- *
+ * @throws Exception If failed.
*/
public void testOriginatingNodeFailureForcesOnePhaseCommitDataCleanup() throws Exception {
ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));