You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/23 12:02:48 UTC
[04/33] incubator-ignite git commit: # ignite-1134 avoid hang on nio
session send after nio server stop
# ignite-1134 avoid hang on nio session send after nio server stop
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/79712aa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/79712aa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/79712aa2
Branch: refs/heads/ignite-gg-10561
Commit: 79712aa2ee30ccff6d7e30e7ed13707d6bad40bf
Parents: f62744e
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 21 10:45:39 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 21 10:45:39 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 7 +-
.../ignite/internal/util/nio/GridNioServer.java | 9 ++
.../util/nio/GridSelectorNioSessionImpl.java | 8 ++
.../internal/util/nio/GridNioSelfTest.java | 88 ++++++++++++++++++--
.../internal/util/nio/GridNioSslSelfTest.java | 16 ++--
5 files changed, 107 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 6efa4d8..fbc8c84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -880,7 +880,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
fut.onNodeLeft(e);
}
catch (IgniteCheckedException e) {
- fut.onResult(e);
+ if (!cctx.kernalContext().isStopping())
+ fut.onResult(e);
}
}
@@ -927,7 +928,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert req.transactionNodes() != null;
- //noinspection TryWithIdenticalCatches
try {
cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
@@ -935,7 +935,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
fut.onNodeLeft(e);
}
catch (IgniteCheckedException e) {
- fut.onResult(e);
+ if (!cctx.kernalContext().isStopping())
+ fut.onResult(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 24e1e08..b36f9f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -311,6 +311,9 @@ public class GridNioServer<T> {
U.join(clientWorkers, log);
filterChain.stop();
+
+ for (GridSelectorNioSessionImpl ses : sessions)
+ ses.onServerStopped();
}
}
@@ -1496,6 +1499,9 @@ public class GridNioServer<T> {
req.onDone(e);
}
+
+ if (closed)
+ ses.onServerStopped();
}
catch (ClosedChannelException e) {
U.warn(log, "Failed to register accepted socket channel to selector (channel was closed): "
@@ -1525,6 +1531,9 @@ public class GridNioServer<T> {
sessions.remove(ses);
+ if (closed)
+ ses.onServerStopped();
+
SelectionKey key = ses.key();
// Shutdown input and output so that remote client will see correct socket close.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index cf240ca..458786b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -290,6 +290,14 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
return super.addMeta(key, val);
}
+ /**
+ *
+ */
+ void onServerStopped() {
+ if (sem != null)
+ sem.release(1_000_000);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index c81ed56..fa8d4a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -393,6 +393,62 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testSendAfterServerStop() throws Exception {
+ final AtomicReference<GridNioSession> sesRef = new AtomicReference<>();
+
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+
+ GridNioServerListener lsnr = new GridNioServerListenerAdapter() {
+ @Override public void onConnected(GridNioSession ses) {
+ info("On connected: " + ses);
+
+ sesRef.set(ses);
+
+ connectLatch.countDown();
+ }
+
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ }
+
+ @Override public void onMessage(GridNioSession ses, Object msg) {
+ log.info("Message: " + msg);
+ }
+ };
+
+ GridNioServer.Builder<?> builder = serverBuilder(PORT, new GridPlainParser(), lsnr);
+
+ GridNioServer<?> srvr = builder.sendQueueLimit(5).build();
+
+ srvr.start();
+
+ try {
+ Socket s = createSocket();
+
+ s.connect(new InetSocketAddress(U.getLocalHost(), PORT), 1000);
+
+ s.getOutputStream().write(new byte[1]);
+
+ U.await(connectLatch);
+
+ GridNioSession ses = sesRef.get();
+
+ assertNotNull(ses);
+
+ ses.send(new byte[1]);
+
+ srvr.stop();
+
+ for (int i = 0; i < 10; i++)
+ ses.send(new byte[1]);
+ }
+ finally {
+ srvr.stop();
+ }
+ }
+
+ /**
* Sends message and validates reply.
*
* @param msg Message to send.
@@ -480,10 +536,29 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
* @return Started server.
* @throws Exception If failed.
*/
- @SuppressWarnings("unchecked")
- protected GridNioServer<?> startServer(int port, GridNioParser parser, GridNioServerListener lsnr)
+ protected final GridNioServer<?> startServer(int port, GridNioParser parser, GridNioServerListener lsnr)
throws Exception {
- GridNioServer<?> srvr = GridNioServer.builder()
+ GridNioServer<?> srvr = serverBuilder(port, parser, lsnr).build();
+
+ srvr.start();
+
+ return srvr;
+ }
+
+ /**
+ * @param port Port to listen.
+ * @param parser Parser to use.
+ * @param lsnr Listener.
+ * @return Server builder.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ protected GridNioServer.Builder<?> serverBuilder(int port,
+ GridNioParser parser,
+ GridNioServerListener lsnr)
+ throws Exception
+ {
+ return GridNioServer.builder()
.address(U.getLocalHost())
.port(port)
.listener(lsnr)
@@ -496,12 +571,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
.socketSendBufferSize(0)
.socketReceiveBufferSize(0)
.sendQueueLimit(0)
- .filters(new GridNioCodecFilter(parser, log, false))
- .build();
-
- srvr.start();
-
- return srvr;
+ .filters(new GridNioCodecFilter(parser, log, false));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java
index 930b5d1..73f5ba5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java
@@ -52,9 +52,12 @@ public class GridNioSslSelfTest extends GridNioSelfTest {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override protected GridNioServer<?> startServer(int port, GridNioParser parser, GridNioServerListener lsnr)
- throws Exception {
- GridNioServer<?> srvr = GridNioServer.builder()
+ @Override protected GridNioServer.Builder<?> serverBuilder(int port,
+ GridNioParser parser,
+ GridNioServerListener lsnr)
+ throws Exception
+ {
+ return GridNioServer.builder()
.address(U.getLocalHost())
.port(port)
.listener(lsnr)
@@ -69,12 +72,7 @@ public class GridNioSslSelfTest extends GridNioSelfTest {
.sendQueueLimit(0)
.filters(
new GridNioCodecFilter(parser, log, false),
- new GridNioSslFilter(sslCtx, log))
- .build();
-
- srvr.start();
-
- return srvr;
+ new GridNioSslFilter(sslCtx, log));
}
/** {@inheritDoc} */