You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/09 11:27:42 UTC
[20/25] ignite git commit: IGNITE-3220 I/O bottleneck on
server/client cluster configuration Communications optimizations: -
possibility to open separate in/out connections - possibility to have
multiple connections between nodes - implemented NIO sessio
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 1b11688..87d9225 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -611,6 +611,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
+ @Override public int partition() {
+ return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 1c1addd..c3e9fbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -125,6 +125,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
);
}
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return partId;
+ }
+
/**
* @param key Key to add.
* @param val Optional update value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index fa7f367..4272a4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -244,6 +244,11 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
return accessTtl;
}
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ }
+
/**
* @param ctx Cache context.
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index a419887..bc16ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1585,6 +1585,9 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
*/
@SuppressWarnings("unchecked")
protected IgniteInternalFuture asyncOp(final Callable<?> op) {
+ if (!asyncToggled)
+ return ctx.closures().callLocalSafe(op);
+
IgniteInternalFuture fail = asyncOpAcquire();
if (fail != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index d34047e..eb5e214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -272,7 +272,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
false,
null,
req.keyValueFilter(),
- req.partition(),
+ req.partition() == -1 ? null : req.partition(),
req.className(),
req.clause(),
req.includeMetaData(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 60c4662..9f965d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -121,7 +121,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
private int taskHash;
/** Partition. */
- private int part;
+ private int part = -1;
/** */
private AffinityTopologyVersion topVer;
@@ -478,8 +478,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/**
* @return partition.
*/
- @Nullable public Integer partition() {
- return part == -1 ? null : part;
+ public int partition() {
+ return part;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 6d21dcf..393fb1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -391,7 +391,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> loadMissing(
final GridCacheContext cacheCtx,
- AffinityTopologyVersion topVer,
+ final AffinityTopologyVersion topVer,
final boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
@@ -472,7 +472,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CacheObject cacheVal = cacheCtx.toCacheObject(val);
while (true) {
- GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
+ GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
try {
GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null);
@@ -1507,7 +1507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
assert txEntry != null || readCommitted() || skipVals;
- GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+ GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached();
if (readCommitted() || skipVals) {
cacheCtx.evicts().touch(e, topologyVersion());
@@ -1658,7 +1658,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
IgniteTxLocalAdapter.this,
/*swap*/cacheCtx.isSwapOrOffheapEnabled(),
/*unmarshal*/true,
- /**update-metrics*/true,
+ /*update-metrics*/true,
/*event*/!skipVals,
CU.subjectId(IgniteTxLocalAdapter.this, cctx),
transformClo,
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 32fda87..fee4dd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -328,6 +328,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (!allowOverwrite)
cctx.topology().readLock();
+ GridDhtTopologyFuture topWaitFut = null;
+
try {
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
@@ -352,19 +354,25 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
}
- else {
- fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
- localUpdate(nodeId, req, updater, topic);
- }
- });
- }
+ else
+ topWaitFut = fut;
}
finally {
if (!allowOverwrite)
cctx.topology().readUnlock();
}
+ if (topWaitFut != null) {
+ // Need call 'listen' after topology read lock is released.
+ topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+ localUpdate(nodeId, req, updater, topic);
+ }
+ });
+
+ return;
+ }
+
if (job != null) {
try {
job.call();
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 3405b53..4c037b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -226,7 +226,7 @@ public class IgfsContext {
*/
public void runInIgfsThreadPool(Runnable r) {
try {
- igfsSvc.submit(r);
+ igfsSvc.execute(r);
}
catch (RejectedExecutionException ignored) {
// This exception will happen if network speed is too low and data comes faster
@@ -252,4 +252,4 @@ public class IgfsContext {
return mgr;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index e534800..4490a68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.igfs;
+import java.util.concurrent.Executor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
@@ -36,6 +37,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -325,6 +327,8 @@ public class IgfsDataManager extends IgfsManager {
IgniteInternalFuture<byte[]> fut = dataCachePrj.getAsync(key);
if (secReader != null) {
+ Executor exec = igfsCtx.kernalContext().pools().poolForPolicy(GridIoPolicy.IGFS_POOL);
+
fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>() {
@Override public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws IgniteCheckedException {
byte[] res = fut.get();
@@ -365,7 +369,7 @@ public class IgfsDataManager extends IgfsManager {
return res;
}
- });
+ }, exec);
}
else
igfsCtx.metrics().addReadBlocks(1, 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index ab4ee85..6b23e80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -326,7 +326,7 @@ public final class IgfsImpl implements IgfsEx {
// Submit it to the thread pool immediately.
assert dualPool != null;
- dualPool.submit(batch);
+ dualPool.execute(batch);
// Spin in case another batch is currently running.
while (true) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index 9388a8e..7cba9bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -139,6 +139,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
.logger(log)
.selectorCount(DFLT_SELECTOR_CNT)
.gridName(ctx.gridName())
+ .serverName("odbc")
.tcpNoDelay(DFLT_TCP_NODELAY)
.directBuffer(DFLT_TCP_DIRECT_BUF)
.byteOrder(ByteOrder.nativeOrder())
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index fd1c2d4..9d9a4d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.compute;
+import java.util.concurrent.Executor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.binary.BinaryObject;
@@ -410,6 +411,11 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture chain(IgniteClosure doneCb, Executor exec) {
+ throw new UnsupportedOperationException("Chain operation is not supported.");
+ }
+
+ /** {@inheritDoc} */
@Override public Throwable error() {
return fut.error();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
index b403654..71eca65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
@@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler;
@@ -38,8 +37,6 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.ATOMIC_DECREMENT;
@@ -72,24 +69,16 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
/** Handler. */
private final GridRestProtocolHandler hnd;
- /** JDK marshaller. */
- private final Marshaller jdkMarshaller = new JdkMarshaller();
-
- /** Context. */
- private final GridKernalContext ctx;
-
/**
* Creates listener which will convert incoming tcp packets to rest requests and forward them to
* a given rest handler.
*
* @param log Logger to use.
* @param hnd Rest handler.
- * @param ctx Context.
*/
- public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd, GridKernalContext ctx) {
+ public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd) {
this.log = log;
this.hnd = hnd;
- this.ctx = ctx;
}
/** {@inheritDoc} */
@@ -462,4 +451,4 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
return new GridTuple3<>(cmd, quiet, retKey);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
index 1c1c6dc..3ba6d8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
@@ -145,7 +145,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
*/
public GridTcpRestNioListener(IgniteLogger log, GridTcpRestProtocol proto, GridRestProtocolHandler hnd,
GridKernalContext ctx) {
- memcachedLsnr = new GridTcpMemcachedNioListener(log, hnd, ctx);
+ memcachedLsnr = new GridTcpMemcachedNioListener(log, hnd);
redisLsnr = new GridRedisNioListener(log, hnd, ctx);
this.log = log;
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index 6338fcc..2a002a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@ -257,6 +257,7 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
.logger(log)
.selectorCount(cfg.getSelectorCount())
.gridName(ctx.gridName())
+ .serverName("tcp-rest")
.tcpNoDelay(cfg.isNoDelay())
.directBuffer(cfg.isDirectBuffer())
.byteOrder(ByteOrder.nativeOrder())
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index b9b92b8..9a5f077 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1131,7 +1131,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
// Start service in its own thread.
final ExecutorService exe = svcCtx.executor();
- exe.submit(new Runnable() {
+ exe.execute(new Runnable() {
@Override public void run() {
try {
svc.execute(svcCtx);
@@ -1394,7 +1394,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
return;
try {
- depExe.submit(new BusyRunnable() {
+ depExe.execute(new BusyRunnable() {
@Override public void run0() {
onSystemCacheUpdated(deps);
}
@@ -1587,7 +1587,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
else
topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
- depExe.submit(new BusyRunnable() {
+ depExe.execute(new BusyRunnable() {
@Override public void run0() {
ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e1937bb..3dfb3c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -153,7 +153,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.binary.BinaryRawWriter;
import org.apache.ignite.cluster.ClusterGroupEmptyException;
@@ -506,10 +505,27 @@ public abstract class IgniteUtils {
}
};
- /**
- * Initializes enterprise check.
+ /** */
+ private static final boolean assertionsEnabled;
+
+ /*
+ *
*/
static {
+ boolean assertionsEnabled0 = true;
+
+ try {
+ assert false;
+
+ assertionsEnabled0 = false;
+ }
+ catch (AssertionError ignored) {
+ assertionsEnabled0 = true;
+ }
+ finally {
+ assertionsEnabled = assertionsEnabled0;
+ }
+
String osName = System.getProperty("os.name");
String osLow = osName.toLowerCase();
@@ -1284,6 +1300,27 @@ public abstract class IgniteUtils {
}
/**
+ * @param threadId Thread ID.
+ * @param sb Builder.
+ */
+ public static void printStackTrace(long threadId, GridStringBuilder sb) {
+ ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+ ThreadInfo threadInfo = mxBean.getThreadInfo(threadId, Integer.MAX_VALUE);
+
+ printThreadInfo(threadInfo, sb, Collections.<Long>emptySet());
+ }
+
+ /**
+ * @return {@code true} if there is java level deadlock.
+ */
+ public static boolean deadlockPresent() {
+ ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+ return !F.isEmpty(mxBean.findDeadlockedThreads());
+ }
+
+ /**
* Prints single thread info to a buffer.
*
* @param threadInfo Thread info.
@@ -6141,6 +6178,13 @@ public abstract class IgniteUtils {
}
/**
+ * @return {@code True} if assertions enabled.
+ */
+ public static boolean assertionsEnabled() {
+ return assertionsEnabled;
+ }
+
+ /**
* Gets OS JDK string.
*
* @return OS JDK string.
@@ -8337,6 +8381,18 @@ public abstract class IgniteUtils {
}
/**
+ * Gets absolute value for long. If argument is {@link Long#MIN_VALUE}, then {@code 0} is returned.
+ *
+ * @param i Argument.
+ * @return Absolute value.
+ */
+ public static long safeAbs(long i) {
+ i = Math.abs(i);
+
+ return i < 0 ? 0 : i;
+ }
+
+ /**
* Gets wrapper class for a primitive type.
*
* @param cls Class. If {@code null}, method is no-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
new file mode 100644
index 0000000..e9ec74b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Striped executor.
+ */
+public class StripedExecutor implements ExecutorService {
+ /** Stripes. */
+ private final Stripe[] stripes;
+
+ /** For starvation checks. */
+ private final long[] completedCntrs;
+
+ /** */
+ private final IgniteLogger log;
+
+ /**
+ * Constructor.
+ *
+ * @param cnt Count.
+ */
+ public StripedExecutor(int cnt, String gridName, String poolName, final IgniteLogger log) {
+ A.ensure(cnt > 0, "cnt > 0");
+
+ boolean success = false;
+
+ stripes = new Stripe[cnt];
+
+ completedCntrs = new long[cnt];
+
+ Arrays.fill(completedCntrs, -1);
+
+ this.log = log;
+
+ try {
+ for (int i = 0; i < cnt; i++) {
+ stripes[i] = new StripeConcurrentQueue(
+ gridName,
+ poolName,
+ i,
+ log);
+
+ stripes[i].start();
+ }
+
+ success = true;
+ }
+ catch (Error | RuntimeException e) {
+ U.error(log, "Failed to initialize striped pool.", e);
+
+ throw e;
+ }
+ finally {
+ if (!success) {
+ for (Stripe stripe : stripes) {
+ if (stripe != null)
+ stripe.signalStop();
+ }
+
+ for (Stripe stripe : stripes) {
+ if (stripe != null)
+ stripe.awaitStop();
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks starvation in striped pool. Maybe too verbose
+ * but this is needed to faster debug possible issues.
+ */
+ public void checkStarvation() {
+ for (int i = 0; i < stripes.length; i++) {
+ Stripe stripe = stripes[i];
+
+ long completedCnt = stripe.completedCnt;
+
+ boolean active = stripe.active;
+
+ if (completedCntrs[i] != -1 &&
+ completedCntrs[i] == completedCnt &&
+ active) {
+ boolean deadlockPresent = U.deadlockPresent();
+
+ GridStringBuilder sb = new GridStringBuilder();
+
+ sb.a(">>> Possible starvation in striped pool: ")
+ .a(stripe.thread.getName()).a(U.nl())
+ .a(stripe.queueToString()).a(U.nl())
+ .a("deadlock: ").a(deadlockPresent).a(U.nl())
+ .a("completed: ").a(completedCnt).a(U.nl());
+
+ U.printStackTrace(
+ stripe.thread.getId(),
+ sb);
+
+ String msg = sb.toString();
+
+ U.warn(log, msg);
+ }
+
+ if (active || completedCnt > 0)
+ completedCntrs[i] = completedCnt;
+ }
+ }
+
+ /**
+ * @return Stripes count.
+ */
+ public int stripes() {
+ return stripes.length;
+ }
+
+ /**
+ * Execute command.
+ *
+ * @param idx Index.
+ * @param cmd Command.
+ */
+ public void execute(int idx, Runnable cmd) {
+ if (idx == -1)
+ execute(cmd);
+ else {
+ assert idx >= 0 : idx;
+
+ stripes[idx % stripes.length].execute(cmd);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void shutdown() {
+ signalStop();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(@NotNull Runnable cmd) {
+ stripes[ThreadLocalRandom.current().nextInt(stripes.length)].execute(cmd);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return Empty list (always).
+ */
+ @NotNull @Override public List<Runnable> shutdownNow() {
+ signalStop();
+
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean awaitTermination(
+ long timeout,
+ @NotNull TimeUnit unit
+ ) throws InterruptedException {
+ awaitStop();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isShutdown() {
+ for (Stripe stripe : stripes) {
+ if (stripe != null && stripe.stopping)
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTerminated() {
+ for (Stripe stripe : stripes) {
+ if (stripe.thread.getState() != Thread.State.TERMINATED)
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Stops executor.
+ */
+ public void stop() {
+ signalStop();
+
+ awaitStop();
+ }
+
+ /**
+ * Signals all stripes.
+ */
+ private void signalStop() {
+ for (Stripe stripe : stripes)
+ stripe.signalStop();
+ }
+
+ /**
+ * @throws IgniteInterruptedException If interrupted.
+ */
+ private void awaitStop() throws IgniteInterruptedException {
+ for (Stripe stripe : stripes)
+ stripe.awaitStop();
+ }
+
+ /**
+ * @return Return total queue size of all stripes.
+ */
+ public int queueSize() {
+ int size = 0;
+
+ for (Stripe stripe : stripes)
+ size += stripe.queueSize();
+
+ return size;
+ }
+
+ /**
+ * @return Completed tasks count.
+ */
+ public long completedTasks() {
+ long cnt = 0;
+
+ for (Stripe stripe : stripes)
+ cnt += stripe.completedCnt;
+
+ return cnt;
+ }
+
+ /**
+ * Operation not supported.
+ */
+ @NotNull @Override public <T> Future<T> submit(
+ @NotNull Runnable task,
+ T res
+ ) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Operation not supported.
+ */
+ @NotNull @Override public Future<?> submit(@NotNull Runnable task) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Operation not supported.
+ */
+ @NotNull @Override public <T> Future<T> submit(@NotNull Callable<T> task) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Operation not supported.
+ */
+ @NotNull @Override public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Operation not supported.
+ */
+ @NotNull @Override public <T> List<Future<T>> invokeAll(
+ @NotNull Collection<? extends Callable<T>> tasks,
+ long timeout,
+ @NotNull TimeUnit unit
+ ) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Operation not supported.
+ */
+ @NotNull @Override public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Operation not supported.
+ */
+ @Override public <T> T invokeAny(
+ @NotNull Collection<? extends Callable<T>> tasks,
+ long timeout,
+ @NotNull TimeUnit unit
+ ) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StripedExecutor.class, this);
+ }
+
+ /**
+ * Stripe.
+ */
+ private static abstract class Stripe implements Runnable {
+ /** */
+ private final String gridName;
+
+ /** */
+ private final String poolName;
+
+ /** */
+ private final int idx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** Stopping flag. */
+ private volatile boolean stopping;
+
+ /** */
+ private volatile long completedCnt;
+
+ /** */
+ private volatile boolean active;
+
+ /** Thread executing the loop. */
+ protected Thread thread;
+
+ /**
+ * @param gridName Grid name.
+ * @param poolName Pool name.
+ * @param idx Stripe index.
+ * @param log Logger.
+ */
+ public Stripe(
+ String gridName,
+ String poolName,
+ int idx,
+ IgniteLogger log
+ ) {
+ this.gridName = gridName;
+ this.poolName = poolName;
+ this.idx = idx;
+ this.log = log;
+ }
+
+ /**
+ * Starts the stripe.
+ */
+ void start() {
+ thread = new IgniteThread(gridName, poolName + "-stripe-" + idx, this);
+
+ thread.start();
+ }
+
+ /**
+ * Stop the stripe.
+ */
+ void signalStop() {
+ stopping = true;
+
+ U.interrupt(thread);
+ }
+
+ /**
+ * Await thread stop.
+ */
+ void awaitStop() {
+ try {
+ if (thread != null)
+ thread.join();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ while (!stopping) {
+ Runnable cmd;
+
+ try {
+ cmd = take();
+
+ if (cmd != null) {
+ active = true;
+
+ try {
+ cmd.run();
+ }
+ finally {
+ active = false;
+ completedCnt++;
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ stopping = true;
+
+ Thread.currentThread().interrupt();
+
+ return;
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to execute runnable.", e);
+ }
+ }
+ }
+
+ /**
+ * Execute the command.
+ *
+ * @param cmd Command.
+ */
+ abstract void execute(Runnable cmd);
+
+ /**
+ * @return Next runnable.
+ * @throws InterruptedException If interrupted.
+ */
+ abstract Runnable take() throws InterruptedException;
+
+ /**
+ * @return Queue size.
+ */
+ abstract int queueSize();
+
+ /**
+ * @return Stripe's queue to string presentation.
+ */
+ abstract String queueToString();
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Stripe.class, this);
+ }
+ }
+
+ /**
+ * Stripe.
+ */
+ private static class StripeConcurrentQueue extends Stripe {
+ /** Queue. */
+ private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+
+ /** */
+ private volatile boolean parked;
+
+ /**
+ * @param gridName Grid name.
+ * @param poolName Pool name.
+ * @param idx Stripe index.
+ * @param log Logger.
+ */
+ public StripeConcurrentQueue(
+ String gridName,
+ String poolName,
+ int idx,
+ IgniteLogger log
+ ) {
+ super(gridName,
+ poolName,
+ idx,
+ log);
+ }
+
+ /** {@inheritDoc} */
+ @Override Runnable take() throws InterruptedException {
+ Runnable r;
+
+ for (int i = 0; i < 2048; i++) {
+ r = queue.poll();
+
+ if (r != null)
+ return r;
+ }
+
+ parked = true;
+
+ try {
+ for (;;) {
+ r = queue.poll();
+
+ if (r != null)
+ return r;
+
+ LockSupport.park();
+
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ }
+ }
+ finally {
+ parked = false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ void execute(Runnable cmd) {
+ queue.add(cmd);
+
+ if (parked)
+ LockSupport.unpark(thread);
+ }
+
+ /** {@inheritDoc} */
+ @Override String queueToString() {
+ return String.valueOf(queue);
+ }
+
+ /** {@inheritDoc} */
+ @Override int queueSize() {
+ return queue.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StripeConcurrentQueue.class, this, super.toString());
+ }
+ }
+
+ /**
+ * Stripe.
+ */
+ private static class StripeConcurrentQueueNoPark extends Stripe {
+ /** Queue. */
+ private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+
+ /**
+ * @param gridName Grid name.
+ * @param poolName Pool name.
+ * @param idx Stripe index.
+ * @param log Logger.
+ */
+ public StripeConcurrentQueueNoPark(
+ String gridName,
+ String poolName,
+ int idx,
+ IgniteLogger log
+ ) {
+ super(gridName,
+ poolName,
+ idx,
+ log);
+ }
+
+ /** {@inheritDoc} */
+ @Override Runnable take() {
+ for (;;) {
+ Runnable r = queue.poll();
+
+ if (r != null)
+ return r;
+ }
+ }
+
+ /** {@inheritDoc} */
+ void execute(Runnable cmd) {
+ queue.add(cmd);
+ }
+
+ /** {@inheritDoc} */
+ @Override int queueSize() {
+ return queue.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override String queueToString() {
+ return String.valueOf(queue);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString());
+ }
+ }
+
+ /**
+ * Stripe.
+ */
+ private static class StripeConcurrentBlockingQueue extends Stripe {
+ /** Queue. */
+ private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+
+ /**
+ * @param gridName Grid name.
+ * @param poolName Pool name.
+ * @param idx Stripe index.
+ * @param log Logger.
+ */
+ public StripeConcurrentBlockingQueue(
+ String gridName,
+ String poolName,
+ int idx,
+ IgniteLogger log
+ ) {
+ super(gridName,
+ poolName,
+ idx,
+ log);
+ }
+
+ /** {@inheritDoc} */
+ @Override Runnable take() throws InterruptedException {
+ return queue.take();
+ }
+
+ /** {@inheritDoc} */
+ void execute(Runnable cmd) {
+ queue.add(cmd);
+ }
+
+ /** {@inheritDoc} */
+ @Override int queueSize() {
+ return queue.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override String queueToString() {
+ return String.valueOf(queue);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index 6baedbd..dc63adc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util.future;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -152,6 +153,29 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
}
/** {@inheritDoc} */
+ @Override public <T1> IgniteInternalFuture<T1> chain(final IgniteClosure<? super IgniteInternalFuture<T>, T1> doneCb, Executor exec) {
+ final GridFutureAdapter<T1> fut = new GridFutureAdapter<>();
+
+ exec.execute(new Runnable() {
+ @Override public void run() {
+ try {
+ fut.onDone(doneCb.apply(GridFinishedFuture.this));
+ }
+ catch (GridClosureException e) {
+ fut.onDone(e.unwrap());
+ }
+ catch (RuntimeException | Error e) {
+ fut.onDone(e);
+
+ throw e;
+ }
+ }
+ });
+
+ return fut;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridFinishedFuture.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 2cd534e..c8d85cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.future;
import java.util.Arrays;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.apache.ignite.IgniteCheckedException;
@@ -229,7 +230,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/** {@inheritDoc} */
@Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
- return new ChainFuture<>(this, doneCb);
+ return new ChainFuture<>(this, doneCb, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb,
+ Executor exec) {
+ return new ChainFuture<>(this, doneCb, exec);
}
/**
@@ -487,15 +494,17 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/**
* @param fut Future.
* @param doneCb Closure.
+ * @param cbExec Optional executor to run callback.
*/
ChainFuture(
GridFutureAdapter<R> fut,
- IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb
+ IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb,
+ @Nullable Executor cbExec
) {
this.fut = fut;
this.doneCb = doneCb;
- fut.listen(new GridFutureChainListener<>(this, doneCb));
+ fut.listen(new GridFutureChainListener<>(this, doneCb, cbExec));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
index 947b2ad..367f5d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
@@ -17,15 +17,17 @@
package org.apache.ignite.internal.util.future;
+import java.util.concurrent.Executor;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
/**
* Future listener to fill chained future with converted result of the source future.
*/
-public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> {
+class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> {
/** */
private static final long serialVersionUID = 0L;
@@ -35,21 +37,43 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte
/** Done callback. */
private final IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb;
+ /** */
+ private Executor cbExec;
+
/**
* Constructs chain listener.
+ *
* @param fut Target future.
* @param doneCb Done callback.
+ * @param cbExec Optional executor to run callback.
*/
public GridFutureChainListener(
GridFutureAdapter<R> fut,
- IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb
+ IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb,
+ @Nullable Executor cbExec
) {
this.fut = fut;
this.doneCb = doneCb;
+ this.cbExec = cbExec;
}
/** {@inheritDoc} */
- @Override public void apply(IgniteInternalFuture<T> t) {
+ @Override public void apply(final IgniteInternalFuture<T> t) {
+ if (cbExec != null) {
+ cbExec.execute(new Runnable() {
+ @Override public void run() {
+ applyCallback(t);
+ }
+ });
+ }
+ else
+ applyCallback(t);
+ }
+
+ /**
+ * @param t Target future.
+ */
+ private void applyCallback(IgniteInternalFuture<T> t) {
try {
fut.onDone(doneCb.apply(t));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 6820dc7..d108b56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -201,7 +201,7 @@ public class IpcToNioAdapter<T> {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
assert ses == IpcToNioAdapter.this.ses;
return send((Message)msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 9b014ec..f2ab932 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -35,14 +35,24 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
/** Metrics listener. */
protected final GridNioMetricsListener metricsLsnr;
+ /** */
+ private final int connIdx;
+
/**
+ * @param connIdx Connection index.
* @param metricsLsnr Metrics listener.
*/
- protected GridAbstractCommunicationClient(@Nullable GridNioMetricsListener metricsLsnr) {
+ protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) {
+ this.connIdx = connIdx;
this.metricsLsnr = metricsLsnr;
}
/** {@inheritDoc} */
+ @Override public int connectionIndex() {
+ return connIdx;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean close() {
return reserves.compareAndSet(0, -1);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 0de54e9..71b2c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -96,15 +96,20 @@ public interface GridCommunicationClient {
/**
* @param nodeId Remote node ID. Provided only for sync clients.
* @param msg Message to send.
- * @param closure Ack closure.
+ * @param c Ack closure.
* @throws IgniteCheckedException If failed.
* @return {@code True} if should try to resend message.
*/
- public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> closure)
+ public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> c)
throws IgniteCheckedException;
/**
* @return {@code True} if send is asynchronous.
*/
public boolean async();
+
+ /**
+ * @return Connection index.
+ */
+ public int connectionIndex();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
index 213fd8d..7987d3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
@@ -62,13 +62,20 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
}
/** {@inheritDoc} */
- @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
+ @Override public void onExceptionCaught(
+ GridNioSession ses,
+ IgniteCheckedException ex
+ ) throws IgniteCheckedException {
proceedExceptionCaught(ses, ex);
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
- return proceedSessionWrite(ses, msg);
+ @Override public GridNioFuture<?> onSessionWrite(
+ GridNioSession ses,
+ Object msg,
+ boolean fut
+ ) throws IgniteCheckedException {
+ return proceedSessionWrite(ses, msg, fut);
}
/** {@inheritDoc} */
@@ -137,4 +144,4 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
@Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
proceedSessionWriteTimeout(ses);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
index 9925d2e..40c87cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
@@ -107,8 +107,12 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
- return proceedSessionWrite(ses, msg);
+ @Override public GridNioFuture<?> onSessionWrite(
+ GridNioSession ses,
+ Object msg,
+ boolean fut
+ ) throws IgniteCheckedException {
+ return proceedSessionWrite(ses, msg, fut);
}
/** {@inheritDoc} */
@@ -139,4 +143,4 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
"originalEx=" + ex + ", ex=" + e + ']');
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
index 7083ccf..343e625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
@@ -71,20 +71,27 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
}
/** {@inheritDoc} */
- @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
+ @Override public void onExceptionCaught(
+ GridNioSession ses,
+ IgniteCheckedException ex
+ ) throws IgniteCheckedException {
proceedExceptionCaught(ses, ex);
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+ @Override public GridNioFuture<?> onSessionWrite(
+ GridNioSession ses,
+ Object msg,
+ boolean fut
+ ) throws IgniteCheckedException {
// No encoding needed in direct mode.
if (directMode)
- return proceedSessionWrite(ses, msg);
+ return proceedSessionWrite(ses, msg, fut);
try {
ByteBuffer res = parser.encode(ses, msg);
- return proceedSessionWrite(ses, res);
+ return proceedSessionWrite(ses, res, fut);
}
catch (IOException e) {
throw new GridNioException(e);
@@ -137,4 +144,4 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
@Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
proceedSessionWriteTimeout(ses);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
index 5f88b1f..f7928c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
@@ -105,10 +105,15 @@ public interface GridNioFilter {
*
* @param ses Session instance.
* @param msg Message to send.
- * @return Write future.
+ * @param fut {@code True} if write future should be created.
+ * @return Write future or {@code null}.
* @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
*/
- public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException;
+ public GridNioFuture<?> proceedSessionWrite(
+ GridNioSession ses,
+ Object msg,
+ boolean fut
+ ) throws IgniteCheckedException;
/**
* Forwards session close request to the next logical filter in filter chain.
@@ -149,10 +154,11 @@ public interface GridNioFilter {
*
* @param ses Session on which message should be written.
* @param msg Message being written.
- * @return Write future.
+ * @param fut {@code True} if write future should be created.
+ * @return Write future or {@code null}.
* @throws GridNioException If GridNioException occurred while handling event.
*/
- public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException;
+ public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException;
/**
* Invoked when a new messages received.
@@ -241,4 +247,4 @@ public interface GridNioFilter {
* @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
*/
public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
index 18ab1b2..58ddae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
@@ -108,10 +108,14 @@ public abstract class GridNioFilterAdapter implements GridNioFilter {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+ @Override public GridNioFuture<?> proceedSessionWrite(
+ GridNioSession ses,
+ Object msg,
+ boolean fut
+ ) throws IgniteCheckedException {
checkNext();
- return nextFilter.onSessionWrite(ses, msg);
+ return nextFilter.onSessionWrite(ses, msg, fut);
}
/** {@inheritDoc} */
@@ -180,4 +184,4 @@ public abstract class GridNioFilterAdapter implements GridNioFilter {
throw new GridNioException("Failed to proceed with filter call since previous filter is not set " +
"(do you use filter outside the filter chain?): " + getClass().getName());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
index a3a74e3..8cc690b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
@@ -181,8 +181,12 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
* @return Send future.
* @throws IgniteCheckedException If IgniteCheckedException occurred while handling event.
*/
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
- return tail.onSessionWrite(ses, msg);
+ @Override public GridNioFuture<?> onSessionWrite(
+ GridNioSession ses,
+ Object msg,
+ boolean fut
+ ) throws IgniteCheckedException {
+ return tail.onSessionWrite(ses, msg, fut);
}
/**
@@ -255,9 +259,9 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg)
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut)
throws IgniteCheckedException {
- return proceedSessionWrite(ses, msg);
+ return proceedSessionWrite(ses, msg, fut);
}
/** {@inheritDoc} */
@@ -290,4 +294,4 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
return proceedResumeReads(ses);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index b02acc8..6c0c9c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -45,9 +45,9 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
/**
* Sets ack closure which will be applied when ack received.
*
- * @param closure Ack closure.
+ * @param c Ack closure.
*/
- public void ackClosure(IgniteInClosure<IgniteException> closure);
+ public void ackClosure(IgniteInClosure<IgniteException> c);
/**
* The method will be called when ack received.
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 35480ac..6258c13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -35,8 +35,8 @@ public class GridNioRecoveryDescriptor {
/** Number of acknowledged messages. */
private long acked;
- /** Unacknowledged message futures. */
- private final ArrayDeque<GridNioFuture<?>> msgFuts;
+ /** Unacknowledged messages. */
+ private final ArrayDeque<SessionWriteRequest> msgReqs;
/** Number of messages to resend. */
private int resendCnt;
@@ -77,23 +77,40 @@ public class GridNioRecoveryDescriptor {
/** Number of descriptor reservations (for info purposes). */
private int reserveCnt;
+ /** */
+ private final boolean pairedConnections;
+
/**
+ * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
* @param queueLimit Maximum size of unacknowledged messages queue.
* @param node Node.
* @param log Logger.
*/
- public GridNioRecoveryDescriptor(int queueLimit, ClusterNode node, IgniteLogger log) {
+ public GridNioRecoveryDescriptor(
+ boolean pairedConnections,
+ int queueLimit,
+ ClusterNode node,
+ IgniteLogger log
+ ) {
assert !node.isLocal() : node;
assert queueLimit > 0;
- msgFuts = new ArrayDeque<>(queueLimit);
+ msgReqs = new ArrayDeque<>(queueLimit);
+ this.pairedConnections = pairedConnections;
this.queueLimit = queueLimit;
this.node = node;
this.log = log;
}
/**
+ * @return {@code True} if in/out connections pair is used for communication with node.
+ */
+ public boolean pairedConnections() {
+ return pairedConnections;
+ }
+
+ /**
* @return Connect count.
*/
public long incrementConnectCount() {
@@ -154,19 +171,19 @@ public class GridNioRecoveryDescriptor {
}
/**
- * @param fut NIO future.
+ * @param req Write request.
* @return {@code False} if queue limit is exceeded.
*/
- public boolean add(GridNioFuture<?> fut) {
- assert fut != null;
+ public boolean add(SessionWriteRequest req) {
+ assert req != null;
- if (!fut.skipRecovery()) {
+ if (!req.skipRecovery()) {
if (resendCnt == 0) {
- msgFuts.addLast(fut);
+ msgReqs.addLast(req);
sentCnt++;
- return msgFuts.size() < queueLimit;
+ return msgReqs.size() < queueLimit;
}
else
resendCnt--;
@@ -181,21 +198,19 @@ public class GridNioRecoveryDescriptor {
public void ackReceived(long rcvCnt) {
if (log.isDebugEnabled())
log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt +
- ", msgFuts=" + msgFuts.size() + ']');
+ ", msgReqs=" + msgReqs.size() + ']');
while (acked < rcvCnt) {
- GridNioFuture<?> fut = msgFuts.pollFirst();
+ SessionWriteRequest req = msgReqs.pollFirst();
- assert fut != null : "Missed message future [rcvCnt=" + rcvCnt +
+ assert req != null : "Missed message [rcvCnt=" + rcvCnt +
", acked=" + acked +
", desc=" + this + ']';
- assert fut.isDone() : fut;
-
- if (fut.ackClosure() != null)
- fut.ackClosure().apply(null);
+ if (req.ackClosure() != null)
+ req.ackClosure().apply(null);
- fut.onAckReceived();
+ req.onAckReceived();
acked++;
}
@@ -214,7 +229,7 @@ public class GridNioRecoveryDescriptor {
* @return {@code False} if descriptor is reserved.
*/
public boolean onNodeLeft() {
- GridNioFuture<?>[] futs = null;
+ SessionWriteRequest[] reqs = null;
synchronized (this) {
nodeLeft = true;
@@ -222,24 +237,24 @@ public class GridNioRecoveryDescriptor {
if (reserved)
return false;
- if (!msgFuts.isEmpty()) {
- futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+ if (!msgReqs.isEmpty()) {
+ reqs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]);
- msgFuts.clear();
+ msgReqs.clear();
}
}
- if (futs != null)
- completeOnNodeLeft(futs);
+ if (reqs != null)
+ notifyOnNodeLeft(reqs);
return true;
}
/**
- * @return Message futures for unacknowledged messages.
+ * @return Requests for unacknowledged messages.
*/
- public Deque<GridNioFuture<?>> messagesFutures() {
- return msgFuts;
+ public Deque<SessionWriteRequest> messagesRequests() {
+ return msgReqs;
}
/**
@@ -277,14 +292,14 @@ public class GridNioRecoveryDescriptor {
if (!nodeLeft)
ackReceived(rcvCnt);
- resendCnt = msgFuts.size();
+ resendCnt = msgReqs.size();
}
}
/**
*
*/
- public void connected() {
+ public void onConnected() {
synchronized (this) {
assert reserved : this;
assert !connected : this;
@@ -306,10 +321,37 @@ public class GridNioRecoveryDescriptor {
}
/**
+ * @return Connected flag.
+ */
+ public boolean connected() {
+ synchronized (this) {
+ return connected;
+ }
+ }
+
+ /**
+ * @return Reserved flag.
+ */
+ public boolean reserved() {
+ synchronized (this) {
+ return reserved;
+ }
+ }
+
+ /**
+ * @return Current handshake index.
+ */
+ public Long handshakeIndex() {
+ synchronized (this) {
+ return handshakeReq != null ? handshakeReq.get1() : null;
+ }
+ }
+
+ /**
*
*/
public void release() {
- GridNioFuture<?>[] futs = null;
+ SessionWriteRequest[] futs = null;
synchronized (this) {
connected = false;
@@ -329,15 +371,15 @@ public class GridNioRecoveryDescriptor {
notifyAll();
}
- if (nodeLeft && !msgFuts.isEmpty()) {
- futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+ if (nodeLeft && !msgReqs.isEmpty()) {
+ futs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]);
- msgFuts.clear();
+ msgReqs.clear();
}
}
if (futs != null)
- completeOnNodeLeft(futs);
+ notifyOnNodeLeft(futs);
}
/**
@@ -398,16 +440,16 @@ public class GridNioRecoveryDescriptor {
}
/**
- * @param futs Futures to complete.
+ * @param reqs Requests to notify about error.
*/
- private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
- for (GridNioFuture<?> msg : futs) {
- IOException e = new IOException("Failed to send message, node has left: " + node.id());
+ private void notifyOnNodeLeft(SessionWriteRequest[] reqs) {
+ IOException e = new IOException("Failed to send message, node has left: " + node.id());
- ((GridNioFutureImpl)msg).onDone(e);
+ for (SessionWriteRequest req : reqs) {
+ req.onError(e);
- if (msg.ackClosure() != null)
- msg.ackClosure().apply(new IgniteException(e));
+ if (req.ackClosure() != null)
+ req.ackClosure().apply(new IgniteException(e));
}
}