You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/25 08:39:59 UTC
[17/18] ignite git commit: ignite-4003 Preliminary changes in
GridNioServer for async connect
ignite-4003 Preliminary changes in GridNioServer for async connect
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79d47f87
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79d47f87
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79d47f87
Branch: refs/heads/ignite-6149
Commit: 79d47f87383411319254c9182e09c30158cbd72d
Parents: e1eb1b9
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 25 11:07:57 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 25 11:07:57 2017 +0300
----------------------------------------------------------------------
.../GridClientConnectionManagerAdapter.java | 1 -
.../connection/GridClientNioTcpConnection.java | 2 +-
.../internal/util/GridSpinReadWriteLock.java | 2 +-
.../internal/util/nio/GridNioKeyAttachment.java | 33 +++
.../util/nio/GridNioRecoveryDescriptor.java | 3 +-
.../ignite/internal/util/nio/GridNioServer.java | 248 +++++++++++++++----
.../util/nio/GridSelectorNioSessionImpl.java | 28 +--
.../internal/util/nio/ssl/GridNioSslFilter.java | 12 +-
.../communication/tcp/TcpCommunicationSpi.java | 29 ++-
.../IgniteCacheMessageWriteTimeoutTest.java | 4 +-
.../internal/util/nio/GridNioSelfTest.java | 2 +-
.../spi/GridTcpSpiForwardingSelfTest.java | 1 +
.../GridAbstractCommunicationSelfTest.java | 27 +-
...mmunicationSpiConcurrentConnectSelfTest.java | 28 ++-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 39 ++-
...GridTcpCommunicationSpiRecoverySelfTest.java | 47 +++-
...CommunicationRecoveryAckClosureSelfTest.java | 36 ++-
.../tcp/TcpCommunicationSpiDropNodesTest.java | 3 +-
.../HadoopExternalCommunication.java | 5 +-
19 files changed, 446 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index e325897..aa06322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
sslFilter.directMode(false);
- sslFilter.clientMode(true);
filters = new GridNioFilter[]{codecFilter, sslFilter};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 18ce6e4..f72a009 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -235,7 +235,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
}
- ses = (GridNioSession)srv.createSession(ch, meta).get();
+ ses = (GridNioSession)srv.createSession(ch, meta, false, null).get();
if (sslHandshakeFut != null)
sslHandshakeFut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 4f23979..8fef887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -70,7 +70,7 @@ public class GridSpinReadWriteLock {
private int writeLockEntryCnt;
/**
- * Acquires write lock.
+ * Acquires read lock.
*/
@SuppressWarnings("BusyWait")
public void readLock() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java
new file mode 100644
index 0000000..0c6ec2e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+/**
+ *
+ */
+interface GridNioKeyAttachment {
+ /**
+ * @return {@code True} if session was created.
+ */
+ boolean hasSession();
+
+ /**
+ * @return Session if it was created.
+ */
+ GridSelectorNioSessionImpl session();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 7496728..af7b757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -31,7 +31,6 @@ import org.jetbrains.annotations.Nullable;
/**
* Recovery information for single node.
*/
-@Deprecated // To be splitted into separate classes for in/out data when do not need maintain backward compatibility.
public class GridNioRecoveryDescriptor {
/** Number of acknowledged messages. */
private long acked;
@@ -260,7 +259,7 @@ public class GridNioRecoveryDescriptor {
/**
* @param node Node.
- * @return {@code True} if node is not null and has the same order as initial remtoe node.
+ * @return {@code True} if node is not null and has the same order as initial remote node.
*/
public boolean nodeAlive(@Nullable ClusterNode node) {
return node != null && node.order() == this.node.order();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a04a735..0dd7dd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -113,6 +113,12 @@ public class GridNioServer<T> {
/** SSL write buf limit. */
private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
+ /** Session future meta key. */
+ public static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** Selection key meta key. */
+ private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
/** */
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -465,7 +471,7 @@ public class GridNioServer<T> {
* @return Future for operation.
*/
public GridNioFuture<Boolean> close(GridNioSession ses) {
- assert ses instanceof GridSelectorNioSessionImpl;
+ assert ses instanceof GridSelectorNioSessionImpl : ses;
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
@@ -811,17 +817,32 @@ public class GridNioServer<T> {
*
* @param ch Channel to register within the server and create session for.
* @param meta Optional meta for new session.
+ * @param async Async connection.
+ * @param lsnr Listener that should be invoked in NIO thread.
* @return Future to get session.
*/
- public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
- @Nullable Map<Integer, ?> meta) {
+ public GridNioFuture<GridNioSession> createSession(
+ final SocketChannel ch,
+ @Nullable Map<Integer, Object> meta,
+ boolean async,
+ @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr
+ ) {
try {
if (!closed) {
ch.configureBlocking(false);
NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
- offerBalanced(req);
+ if (async) {
+ assert meta != null;
+
+ req.op = NioOperation.CONNECT;
+ }
+
+ if (lsnr != null)
+ req.listen(lsnr);
+
+ offerBalanced(req, meta);
return req;
}
@@ -835,6 +856,29 @@ public class GridNioServer<T> {
}
/**
+ * @param ch Channel.
+ * @param meta Session meta.
+ */
+ public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) {
+ if (!closed) {
+ NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+
+ req.op = NioOperation.CANCEL_CONNECT;
+
+ Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY);
+
+ assert idx != null : meta;
+
+ clientWorkers.get(idx).offer(req);
+
+ return req;
+ }
+ else
+ return new GridNioFinishedFuture<>(
+ new IgniteCheckedException("Failed to cancel connection, server is stopped."));
+ }
+
+ /**
* Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}.
*
* @return Write timeout in milliseconds.
@@ -920,9 +964,10 @@ public class GridNioServer<T> {
/**
* @param req Request to balance.
+ * @param meta Session metadata.
*/
- private synchronized void offerBalanced(NioOperationFuture req) {
- assert req.operation() == NioOperation.REGISTER : req;
+ private synchronized void offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) {
+ assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT : req;
assert req.socketChannel() != null : req;
int workers = clientWorkers.size();
@@ -960,6 +1005,9 @@ public class GridNioServer<T> {
else
balanceIdx = 0;
+ if (meta != null)
+ meta.put(WORKER_IDX_META_KEY, balanceIdx);
+
clientWorkers.get(balanceIdx).offer(req);
}
@@ -1788,6 +1836,38 @@ public class GridNioServer<T> {
while ((req0 = changeReqs.poll()) != null) {
switch (req0.operation()) {
+ case CONNECT: {
+ NioOperationFuture fut = (NioOperationFuture)req0;
+
+ SocketChannel ch = fut.socketChannel();
+
+ try {
+ ch.register(selector, SelectionKey.OP_CONNECT, fut);
+ }
+ catch (IOException e) {
+ fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
+ }
+
+ break;
+ }
+
+ case CANCEL_CONNECT: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ SocketChannel ch = req.socketChannel();
+
+ SelectionKey key = ch.keyFor(selector);
+
+ if (key != null)
+ key.cancel();
+
+ U.closeQuiet(ch);
+
+ req.onDone();
+
+ break;
+ }
+
case REGISTER: {
register((NioOperationFuture)req0);
@@ -1998,8 +2078,12 @@ public class GridNioServer<T> {
log.debug("Closing all connected client sockets.");
// Close all channels registered with selector.
- for (SelectionKey key : selector.keys())
- close((GridSelectorNioSessionImpl)key.attachment(), null);
+ for (SelectionKey key : selector.keys()) {
+ GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
+
+ if (attach != null && attach.hasSession())
+ close(attach.session(), null);
+ }
if (log.isDebugEnabled())
log.debug("Closing NIO selector.");
@@ -2173,11 +2257,17 @@ public class GridNioServer<T> {
if (!key.isValid())
continue;
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
- assert ses != null;
+ assert attach != null;
try {
+ if (!attach.hasSession() && key.isConnectable()) {
+ processConnect(key);
+
+ continue;
+ }
+
if (key.isReadable())
processRead(key);
@@ -2196,7 +2286,12 @@ public class GridNioServer<T> {
// No-op.
}
- U.warn(log, "Failed to process selector key (will close): " + ses, e);
+ GridSelectorNioSessionImpl ses = attach.session();
+
+ if (!closed)
+ U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
close(ses, new GridNioException(e));
}
@@ -2225,11 +2320,17 @@ public class GridNioServer<T> {
if (!key.isValid())
continue;
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
- assert ses != null;
+ assert attach != null;
try {
+ if (!attach.hasSession() && key.isConnectable()) {
+ processConnect(key);
+
+ continue;
+ }
+
if (key.isReadable())
processRead(key);
@@ -2248,10 +2349,12 @@ public class GridNioServer<T> {
// No-op.
}
- if (!closed)
- U.warn(log, "Failed to process selector key (will close): " + ses, e);
+ GridSelectorNioSessionImpl ses = attach.session();
- close(ses, new GridNioException(e));
+ if (!closed)
+ U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
}
}
}
@@ -2265,7 +2368,12 @@ public class GridNioServer<T> {
long now = U.currentTimeMillis();
for (SelectionKey key : keys) {
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
+
+ if (attach == null || !attach.hasSession())
+ continue;
+
+ GridSelectorNioSessionImpl ses = attach.session();
try {
long writeTimeout0 = writeTimeout;
@@ -2303,12 +2411,12 @@ public class GridNioServer<T> {
/**
* Registers given socket channel to the selector, creates a session and notifies the listener.
*
- * @param req Registration request.
+ * @param fut Registration future.
*/
- private void register(NioOperationFuture<GridNioSession> req) {
- assert req != null;
+ private void register(NioOperationFuture<GridNioSession> fut) {
+ assert fut != null;
- SocketChannel sockCh = req.socketChannel();
+ SocketChannel sockCh = fut.socketChannel();
assert sockCh != null;
@@ -2334,24 +2442,53 @@ public class GridNioServer<T> {
filterChain,
(InetSocketAddress)sockCh.getLocalAddress(),
(InetSocketAddress)sockCh.getRemoteAddress(),
- req.accepted(),
+ fut.accepted(),
sndQueueLimit,
writeBuf,
readBuf);
- Map<Integer, ?> meta = req.meta();
+ Map<Integer, ?> meta = fut.meta();
if (meta != null) {
for (Entry<Integer, ?> e : meta.entrySet())
ses.addMeta(e.getKey(), e.getValue());
+
+ if (!ses.accepted()) {
+ GridNioRecoveryDescriptor desc =
+ (GridNioRecoveryDescriptor)meta.get(RECOVERY_DESC_META_KEY);
+
+ if (desc != null) {
+ ses.outRecoveryDescriptor(desc);
+
+ if (!desc.pairedConnections())
+ ses.inRecoveryDescriptor(desc);
+ }
+ }
}
- SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+ SelectionKey key;
- ses.key(key);
+ if (!sockCh.isRegistered()) {
+ assert fut.op == NioOperation.REGISTER : fut.op;
+
+ key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+
+ ses.key(key);
- if (!ses.accepted())
resend(ses);
+ }
+ else {
+ assert fut.op == NioOperation.CONNECT : fut.op;
+
+ key = sockCh.keyFor(selector);
+
+ key.attach(ses);
+
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+ key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+
+ ses.key(key);
+ }
sessions.add(ses);
workerSessions.add(ses);
@@ -2359,12 +2496,12 @@ public class GridNioServer<T> {
try {
filterChain.onSessionOpened(ses);
- req.onDone(ses);
+ fut.onDone(ses);
}
catch (IgniteCheckedException e) {
close(ses, e);
- req.onDone(e);
+ fut.onDone(e);
}
if (closed)
@@ -2486,6 +2623,31 @@ public class GridNioServer<T> {
}
/**
+ * @param key Key.
+ * @throws IOException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void processConnect(SelectionKey key) throws IOException {
+ SocketChannel ch = (SocketChannel)key.channel();
+
+ NioOperationFuture<GridNioSession> sesFut = (NioOperationFuture<GridNioSession>)key.attachment();
+
+ assert sesFut != null;
+
+ try {
+ if (ch.finishConnect())
+ register(sesFut);
+ }
+ catch (IOException e) {
+ U.closeQuiet(ch);
+
+ sesFut.onDone(new GridNioException("Failed to connect to node", e));
+
+ throw e;
+ }
+ }
+
+ /**
* Processes read-available event on the key.
*
* @param key Key that is ready to be read.
@@ -2690,7 +2852,7 @@ public class GridNioServer<T> {
if (log.isDebugEnabled())
log.debug("Accepted new client connection: " + sockCh.socket().getRemoteSocketAddress());
- addRegistrationReq(sockCh);
+ addRegistrationRequest(sockCh);
}
}
}
@@ -2701,15 +2863,21 @@ public class GridNioServer<T> {
*
* @param sockCh Socket channel to be registered on one of the selectors.
*/
- private void addRegistrationReq(SocketChannel sockCh) {
- offerBalanced(new NioOperationFuture(sockCh));
+ private void addRegistrationRequest(SocketChannel sockCh) {
+ offerBalanced(new NioOperationFuture<>(sockCh, true, null), null);
}
}
/**
* Asynchronous operation that may be requested on selector.
*/
- enum NioOperation {
+ private enum NioOperation {
+ /** Register connect key selection. */
+ CONNECT,
+
+ /** Cancel connect. */
+ CANCEL_CONNECT,
+
/** Register read key selection. */
REGISTER,
@@ -2914,7 +3082,7 @@ public class GridNioServer<T> {
* Class for requesting write and session close operations.
*/
private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest,
- SessionChangeRequest {
+ SessionChangeRequest, GridNioKeyAttachment {
/** Socket channel in register request. */
@GridToStringExclude
private SocketChannel sockCh;
@@ -2942,15 +3110,6 @@ public class GridNioServer<T> {
private boolean skipRecovery;
/**
- * Creates registration request for a given socket channel.
- *
- * @param sockCh Socket channel to register on selector.
- */
- NioOperationFuture(SocketChannel sockCh) {
- this(sockCh, true, null);
- }
-
- /**
* @param sockCh Socket channel.
* @param accepted {@code True} if socket has been accepted.
* @param meta Optional meta.
@@ -3038,6 +3197,11 @@ public class GridNioServer<T> {
}
/** {@inheritDoc} */
+ @Override public boolean hasSession() {
+ return ses != null;
+ }
+
+ /** {@inheritDoc} */
public NioOperation operation() {
return op;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 66f9176..71a3b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -37,7 +37,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
* Note that this implementation requires non-null values for local and remote
* socket addresses.
*/
-class GridSelectorNioSessionImpl extends GridNioSessionImpl {
+class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment {
/** Pending write requests. */
private final ConcurrentLinkedDeque8<SessionWriteRequest> queue = new ConcurrentLinkedDeque8<>();
@@ -129,6 +129,16 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
}
}
+ /** {@inheritDoc} */
+ @Override public boolean hasSession() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridSelectorNioSessionImpl session() {
+ return this;
+ }
+
/**
* @return Worker.
*/
@@ -385,22 +395,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
return inRecovery;
}
- /** {@inheritDoc} */
- @Override public <T> T addMeta(int key, @Nullable T val) {
- if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
- outRecovery = (GridNioRecoveryDescriptor)val;
-
- if (!outRecovery.pairedConnections())
- inRecovery = outRecovery;
-
- outRecovery.onConnected();
-
- return null;
- }
- else
- return super.addMeta(key, val);
- }
-
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index b4bd34a..f8a0dce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -69,9 +69,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
/** Allocate direct buffer or heap buffer. */
private boolean directBuf;
- /** Whether SSLEngine should use client mode. */
- private boolean clientMode;
-
/** Whether direct mode is used. */
private boolean directMode;
@@ -93,13 +90,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
}
/**
- * @param clientMode Flag indicating whether SSLEngine should use client mode..
- */
- public void clientMode(boolean clientMode) {
- this.clientMode = clientMode;
- }
-
- /**
*
* @param directMode Flag indicating whether direct mode is used.
*/
@@ -164,6 +154,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
if (sslMeta == null) {
engine = sslCtx.createSSLEngine();
+ boolean clientMode = !ses.accepted();
+
engine.setUseClientMode(clientMode);
if (!clientMode) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index bab9cfa..7a54666 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2980,14 +2980,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
- * Establish TCP connection to remote node and returns client.
- *
- * @param node Remote node.
- * @param connIdx Connection index.
- * @return Client.
- * @throws IgniteCheckedException If failed.
+ * @param node Node.
+ * @return Node addresses.
+ * @throws IgniteCheckedException If node does not have addresses.
*/
- protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+ private LinkedHashSet<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException {
Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -3052,6 +3049,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
}
+ return addrs;
+ }
+
+ /**
+ * Establish TCP connection to remote node and returns client.
+ *
+ * @param node Remote node.
+ * @param connIdx Connection index.
+ * @return Client.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+ LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
+
boolean conn = false;
GridCommunicationClient client = null;
IgniteCheckedException errs = null;
@@ -3162,10 +3173,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (recoveryDesc != null) {
recoveryDesc.onHandshake(rcvCnt);
- meta.put(-1, recoveryDesc);
+ meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);
}
- GridNioSession ses = nioSrvr.createSession(ch, meta).get();
+ GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
client = new GridTcpNioCommunicationClient(connIdx, ses, log);
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index b3b5d1a..3ba319b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
// Try provoke connection close on socket writeTimeout.
commSpi.setSharedMemoryPort(-1);
commSpi.setMessageQueueLimit(10);
- commSpi.setSocketReceiveBuffer(40);
- commSpi.setSocketSendBuffer(40);
+ commSpi.setSocketReceiveBuffer(64);
+ commSpi.setSocketSendBuffer(64);
commSpi.setSocketWriteTimeout(100);
commSpi.setUnacknowledgedMessagesBufferSize(1000);
commSpi.setConnectTimeout(10_000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 4dbb7ce..e623467 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -678,7 +678,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
try {
SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port()));
- GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null);
+ GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null, false, null);
ses = fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 1e25003..5d8e316 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.BasicAddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteCallable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 88276c2..e51dac8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,6 +40,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -70,6 +72,9 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
private static final Object mux = new Object();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
protected boolean useSsl = false;
/**
@@ -289,6 +294,12 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart(true);
+
for (int i = 0; i < getSpiCount(); i++) {
CommunicationSpi<Message> spi = getSpi(i);
@@ -298,18 +309,20 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
- node.order(i);
-
GridSpiTestContext ctx = initSpiContext();
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
info(">>> Initialized context: nodeId=" + ctx.localNode().id());
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -324,6 +337,8 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
node.setAttributes(spi.getNodeAttributes());
node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
+ node.order(i + 1);
+
nodes.add(node);
spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -346,6 +361,14 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis.values()) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 78bf869..ce96c55 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -36,7 +36,9 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -51,6 +53,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -79,6 +82,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
protected static final List<ClusterNode> nodes = new ArrayList<>();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
private static int port = 60_000;
/** Use ssl. */
@@ -407,27 +413,37 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart(true);
+
for (int i = 0; i < SPI_CNT; i++) {
CommunicationSpi<Message> spi = createSpi();
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+ node.setAttribute(IgniteNodeAttributes.ATTR_CLIENT_MODE, false);
+
node.order(i + 1);
GridSpiTestContext ctx = initSpiContext();
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
info(">>> Initialized context: nodeId=" + ctx.localNode().id());
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -494,6 +510,14 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index feaae11..1467c29 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -44,6 +45,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -64,6 +66,9 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
protected static final List<ClusterNode> nodes = new ArrayList<>();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
private static final int SPI_CNT = 2;
/**
@@ -159,6 +164,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0));
}
+ U.sleep(500);
+
expMsgs += msgPerIter;
final long totAcked0 = totAcked;
@@ -166,9 +173,13 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
for (TcpCommunicationSpi spi : spis) {
GridNioServer srv = U.field(spi, "nioSrvr");
- Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+ final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
- assertFalse(sessions.isEmpty());
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return !sessions.isEmpty();
+ }
+ }, 5_000);
boolean found = false;
@@ -282,7 +293,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
int sentMsgs = 1;
- for (int i = 0; i < 150; i++) {
+ for (int i = 0; i < 1280; i++) {
try {
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
@@ -379,11 +390,15 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart(true);
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -392,14 +407,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
spi.setListener(new TestListener());
node.setAttributes(spi.getNodeAttributes());
+ node.order(i);
+
nodes.add(node);
spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -455,6 +476,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 2a043ee..c722577 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -80,6 +82,9 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
/** Use ssl. */
protected boolean useSsl;
+ /** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
/**
*
*/
@@ -186,7 +191,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @return Timeout.
*/
protected long awaitForSocketWriteTimeout() {
- return 8000;
+ return 20_000;
}
/**
@@ -298,6 +303,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
final AtomicInteger sentCnt = new AtomicInteger(1);
int errCnt = 0;
@@ -413,6 +424,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
expCnt1.incrementAndGet();
int errCnt = 0;
@@ -451,7 +468,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
ses1.resumeReads().get();
}
catch (IgniteCheckedException ignore) {
- // Can fail is ses1 was closed.
+ // Can fail if ses1 was closed.
}
// Wait when session is closed, then try to open new connection from node1.
@@ -534,6 +551,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
final AtomicInteger sentCnt = new AtomicInteger(1);
int errCnt = 0;
@@ -686,11 +709,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart(true);
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(i);
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -701,10 +728,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -770,6 +801,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 3f58055..d1689bd 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -70,6 +72,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
/** */
private static final int SPI_CNT = 2;
+ /** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
/**
*
*/
@@ -98,8 +103,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
- info("Test listener received message: " + msg);
-
assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
GridTestMessage msg0 = (GridTestMessage)msg;
@@ -171,6 +174,17 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC);
+
+ if (j == 0) {
+ final TestListener lsnr0 = (TestListener)spi0.getListener();
+ final TestListener lsnr1 = (TestListener)spi1.getListener();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr0.rcvCnt.get() >= 1 && lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+ }
}
expMsgs += msgPerIter;
@@ -415,6 +429,12 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart(true);
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
@@ -428,6 +448,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
@@ -436,6 +458,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
node.setAttributes(spi.getNodeAttributes());
+ node.order(i);
+
nodes.add(node);
spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -491,6 +515,14 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index e215a34..842f283 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -202,8 +202,7 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(1);
grid(0).events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event event) {
+ @Override public boolean apply(Event evt) {
latch.countDown();
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/79d47f87/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
index 8a20eec..a241a04 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
@@ -1004,7 +1004,10 @@ public class HadoopExternalCommunication {
HandshakeFinish fin = new HandshakeFinish();
- GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get();
+ GridNioFuture<GridNioSession> sesFut =
+ nioSrvr.createSession(ch, F.<Integer, Object>asMap(HANDSHAKE_FINISH_META, fin), false, null);
+
+ GridNioSession ses = sesFut.get();
client = new HadoopTcpNioCommunicationClient(ses);