You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:42 UTC
[19/50] [abbrv] ignite git commit: IGNITE-3054 - WIP
IGNITE-3054 - WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2274e0c2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2274e0c2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2274e0c2
Branch: refs/heads/ignite-3054
Commit: 2274e0c2b82b6a6fcdf2ffd6598636878cd77b08
Parents: 3b96fbe
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Dec 2 11:28:29 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Dec 2 11:28:29 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 48 ++++----------------
1 file changed, 8 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2274e0c2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index de38709..0e0fd4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -208,9 +208,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Node ID in GridNioSession. */
private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
- /** Size of the send queue. */
- private static final int MSG_QUEUE_SIZE_META = GridNioSessionMetaKey.nextUniqueKey();
-
/**
* Number of tries to reopen ServerSocketChannel on 'SocketException: Invalid argument'.
* <p>This error may happen on simultaneous server nodes startup on the same JVM.</p>
@@ -427,7 +424,7 @@ class ServerImpl extends TcpDiscoveryImpl {
nioSem = new Semaphore(spi.getClientNioThreads());
- clientNioSrv = createClientNioServer();
+ clientNioSrv = createClientNioServer(gridName);
clientNioSrv.start();
nioClientProcessingPool = new IgniteThreadPoolExecutor(
@@ -479,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @return New NIO server.
*/
- private GridNioServer<byte[]> createClientNioServer() {
+ private GridNioServer<byte[]> createClientNioServer(String gridName) {
final GridNioServer<byte[]> srv;
final ArrayList<GridNioFilter> filters = new ArrayList<>();
@@ -493,9 +490,11 @@ class ServerImpl extends TcpDiscoveryImpl {
ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length) : ByteBuffer.wrap(msg0);
- res.put(msg0);
+ if (directBuf) {
+ res.put(msg0);
- res.flip();
+ res.flip();
+ }
return res;
}
@@ -520,11 +519,6 @@ class ServerImpl extends TcpDiscoveryImpl {
final long writeTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
spi.getSocketTimeout();
- String gridName = "client";
-
- if (Thread.currentThread() instanceof IgniteThread)
- gridName = ((IgniteThread)Thread.currentThread()).getGridName();
-
srv = GridNioServer.<byte[]>builder().address(U.getLocalHost())
.port(-1)
.listener(clientLsnr)
@@ -542,11 +536,6 @@ class ServerImpl extends TcpDiscoveryImpl {
.gridName(gridName)
.daemon(false)
.writeTimeout(writeTimeout)
- .messageQueueSizeListener(new IgniteBiInClosure<GridNioSession, Integer>() {
- @Override public void apply(GridNioSession ses, Integer size) {
- ses.addMeta(MSG_QUEUE_SIZE_META, size);
- }
- })
.build();
}
catch (Exception e) {
@@ -5703,8 +5692,6 @@ class ServerImpl extends TcpDiscoveryImpl {
if (state == WorkerState.STOPPED)
return new GridFinishedFuture<>();
- log.warning("== " + ses);
-
final IgniteInternalFuture<Object> res = ses.close().chain(new C1<IgniteInternalFuture<Boolean>, Object>() {
@Override public Object apply(final IgniteInternalFuture<Boolean> fut) {
try {
@@ -5858,16 +5845,6 @@ class ServerImpl extends TcpDiscoveryImpl {
return state;
}
- /**
- * @return Send queue size after adding message, in other words it may return 0
- * only when no messages were sent after creation.
- */
- int queueSize() {
- Integer size = ses.meta(MSG_QUEUE_SIZE_META);
-
- return size == null ? 0 : size;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ClientNioMessageWorker.class, this);
@@ -7780,27 +7757,19 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- boolean skipWait = false;
- final int batchSize = 50;
-
while (!isCancelled()) {
for (final ClientNioMessageWorker worker : nioWorkers) {
if (worker.state == WorkerState.JOINED && worker.sending.compareAndSet(false, true)) {
T2<TcpDiscoveryAbstractMessage, byte[]> msg;
- if (worker.queueSize() < 2) {
- int cnt = batchSize;
-
do {
msg = worker.msgQueue.poll();
if (msg != null)
worker.sendMessage(msg.get1(), msg.get2());
- skipWait = msg != null;
}
- while (msg != null && --cnt > 0);
- }
+ while (msg != null);
boolean res = worker.sending.compareAndSet(true, false);
@@ -7808,8 +7777,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (!skipWait)
- nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
+ nioSem.tryAcquire(1000, TimeUnit.MILLISECONDS);
}
}
}