You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/02/08 14:41:29 UTC
[ignite] branch master updated: IGNITE-14112 Revisit usages of
GridClosureProcessor.runLocalSafe and GridClosureProcessor.callLocalSafe
methods - Fixes #8743.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 21f992a IGNITE-14112 Revisit usages of GridClosureProcessor.runLocalSafe and GridClosureProcessor.callLocalSafe methods - Fixes #8743.
21f992a is described below
commit 21f992a6958529cafd389505a05ad3506a565858
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Feb 8 17:40:13 2021 +0300
IGNITE-14112 Revisit usages of GridClosureProcessor.runLocalSafe and GridClosureProcessor.callLocalSafe methods - Fixes #8743.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../ignite/internal/IgniteSchedulerImpl.java | 4 +-
.../processors/cache/CacheGroupContext.java | 3 +-
.../processors/cache/GridCacheAdapter.java | 28 ++++++------
.../processors/cache/GridCacheIoManager.java | 3 +-
.../processors/cache/GridCacheMvccManager.java | 2 +-
.../cache/GridCachePartitionExchangeManager.java | 3 +-
.../cache/GridDeferredAckMessageSender.java | 3 +-
.../distributed/GridCacheTxRecoveryFuture.java | 3 +-
.../cache/distributed/dht/GridDhtCacheAdapter.java | 2 +-
.../dht/GridDhtTxAbstractEnlistFuture.java | 3 +-
.../dht/GridPartitionedSingleGetFuture.java | 2 +-
.../atomic/GridNearAtomicSingleUpdateFuture.java | 5 ++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 5 ++-
.../dht/preloader/GridDhtPartitionDemander.java | 5 ++-
.../preloader/GridDhtPartitionsExchangeFuture.java | 4 +-
.../dht/preloader/latch/ExchangeLatchManager.java | 3 +-
.../cache/distributed/near/GridNearTxLocal.java | 3 +-
.../processors/cache/local/GridLocalCache.java | 6 +--
.../cache/local/atomic/GridLocalAtomicCache.java | 9 ++--
.../processors/cache/mvcc/MvccProcessorImpl.java | 3 +-
.../query/GridCacheDistributedQueryFuture.java | 6 +--
.../query/GridCacheDistributedQueryManager.java | 4 +-
.../continuous/CacheContinuousQueryHandler.java | 3 +-
.../cache/transactions/IgniteTxManager.java | 5 ++-
.../PartitionCountersNeighborcastFuture.java | 3 +-
.../processors/cluster/ClusterProcessor.java | 5 ++-
.../cluster/GridClusterStateProcessor.java | 3 +-
.../continuous/GridContinuousProcessor.java | 3 +-
.../datastreamer/DataStreamProcessor.java | 3 +-
.../processors/datastreamer/DataStreamerImpl.java | 5 ++-
.../datastructures/DataStructuresProcessor.java | 5 ++-
.../marshaller/GridMarshallerMappingProcessor.java | 3 +-
.../PerformanceStatisticsProcessor.java | 3 +-
.../handlers/cache/GridCacheCommandHandler.java | 5 ++-
.../DataStructuresCommandHandler.java | 6 +--
.../rest/handlers/query/QueryCommandHandler.java | 8 ++--
.../processors/service/GridServiceProcessor.java | 3 +-
.../processors/service/ServiceDeploymentTask.java | 3 +-
.../internal/processors/task/GridTaskWorker.java | 5 ++-
.../internal/visor/query/VisorQueryUtils.java | 5 ++-
.../ignite/p2p/GridP2PLocalDeploymentSelfTest.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 50 +++++++++++-----------
42 files changed, 136 insertions(+), 101 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
index 804c0ff..6930d5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
@@ -30,6 +30,8 @@ import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.scheduler.SchedulerFuture;
@@ -181,7 +183,7 @@ public class IgniteSchedulerImpl implements IgniteScheduler, Externalizable {
}
/** */
- private class SecurityAwareClosure<T> implements Runnable, Callable<T>, GridInternalWrapper<Object> {
+ private class SecurityAwareClosure<T> implements GridPlainRunnable, GridPlainCallable<T>, GridInternalWrapper<Object> {
/** Security subject id. */
private final UUID secSubjId;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 1994457..8492982 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipC
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -1027,7 +1028,7 @@ public class CacheGroupContext {
final List<Runnable> procC = skipCtx != null ? skipCtx.processClosures() : null;
if (procC != null) {
- ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ ctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
for (Runnable c : procC)
c.run();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index ff80d43..2eddcad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -122,6 +122,8 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
@@ -1257,8 +1259,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Clear future.
*/
private IgniteInternalFuture<?> clearLocallyAsync(@Nullable final Set<? extends K> keys) {
- return ctx.closures().callLocalSafe(new Callable<Object>() {
- @Override public Object call() throws Exception {
+ return ctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
+ @Override public Object call() {
if (keys == null)
clearLocally(true, false, false);
else
@@ -3876,7 +3878,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Override public IgniteInternalFuture<?> localLoadCacheAsync(final IgniteBiPredicate<K, V> p,
final Object[] args) {
return ctx.closures().callLocalSafe(
- ctx.projectSafe(new Callable<Object>() {
+ ctx.projectSafe(new GridPlainCallable<Object>() {
@Nullable @Override public Object call() throws IgniteCheckedException {
localLoadCache(p, args);
@@ -4588,7 +4590,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
(IgniteOutClosure<IgniteInternalFuture>)() -> {
GridFutureAdapter resFut = new GridFutureAdapter();
- ctx.kernalContext().closure().runLocalSafe(() -> {
+ ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> {
IgniteInternalFuture fut0;
if (ctx.kernalContext().isStopping())
@@ -5135,7 +5137,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
assert orig == null || orig.optimistic() || orig.readCommitted() || /*contains*/ skipVals;
// Async check and recover if necessary.
- return ctx.kernalContext().closure().callLocalSafe(new Callable<Void>() {
+ return ctx.kernalContext().closure().callLocalSafe(new GridPlainCallable<Void>() {
@Override public Void call() throws IgniteCheckedException {
CacheOperationContext prevOpCtx = ctx.operationContextPerCall();
@@ -5365,12 +5367,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException {
if (isLocal()) {
- return ctx.kernalContext().closure().runLocalSafe(() -> {
- try {
- ctx.offheap().preloadPartition(part);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ return ctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+ @Override public void run() {
+ try {
+ ctx.offheap().preloadPartition(part);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
});
}
@@ -6840,7 +6844,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> t) {
- ignite.context().closure().runLocalSafe(new Runnable() {
+ ignite.context().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
jobCtx.callcc();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 78a330e..2d28b5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -210,7 +211,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
handleMessage(nodeId, cacheMsg, plc);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 33e15a6..d9b38b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -361,7 +361,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
*/
public void removeExplicitNodeLocks(UUID leftNodeId) {
cctx.kernalContext().closure().runLocalSafe(
- new Runnable() {
+ new GridPlainRunnable() {
@Override public void run() {
for (GridDistributedCacheEntry entry : locked()) {
try {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a55e88a..126a27e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -134,6 +134,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -3688,7 +3689,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** {@inheritDoc} */
@Override public void onTimeout() {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
if (!busyLock.readLock().tryLock())
return;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 840bfd5..de0f1c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.util.deque.FastSizeDeque;
@@ -153,7 +154,7 @@ public abstract class GridDeferredAckMessageSender<T> {
/** {@inheritDoc} */
@Override public void onTimeout() {
if (guard.compareAndSet(false, true)) {
- c.runLocalSafe(new Runnable() {
+ c.runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
writeLock().lock();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 3fc053d..64034ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -459,7 +460,7 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
final MiniFuture f = (MiniFuture)fut;
if (f.nodeId().equals(nodeId)) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
f.onNodeLeft(nodeId);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index b52ec45..4bfb4fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1093,7 +1093,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param incomingReq Original ttl request.
*/
private void sendTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest incomingReq) {
- ctx.closures().runLocalSafe(new Runnable() {
+ ctx.closures().runLocalSafe(new GridPlainRunnable() {
@SuppressWarnings({"ForLoopReplaceableByForEach"})
@Override public void run() {
Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 21b895d..08843bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -1035,7 +1036,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
if (nearNodeId.equals(nodeId))
onDone(new ClusterTopologyCheckedException("Requesting node left the grid [nodeId=" + nodeId + ']'));
else if (pending != null && pending.remove(nodeId) != null)
- cctx.kernalContext().closure().runLocalSafe(() -> continueLoop(false));
+ cctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> continueLoop(false));
}
catch (Exception e) {
onDone(e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 1dcf4e4..110b4e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -918,7 +918,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
* @param topVer Topology version.
*/
private void remap(final AffinityTopologyVersion topVer) {
- cctx.closures().runLocalSafe(new Runnable() {
+ cctx.closures().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
// If topology changed reset collection of invalid nodes.
synchronized (this) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 18d2684..29a2645 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -373,7 +374,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
mapOnTopology();
}
@@ -432,7 +433,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
mapOnTopology();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 6045b96..fb37a29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -498,7 +499,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
mapOnTopology();
}
@@ -652,7 +653,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
mapOnTopology();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index bae41ca..7c022ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridIterableAdapter;
import org.apache.ignite.internal.util.lang.GridIterableAdapter.IteratorWrapper;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -1362,14 +1363,14 @@ public class GridDhtPartitionDemander {
}
if (waitCnt.decrementAndGet() == 0)
- ctx.kernalContext().closure().runLocalSafe(() -> requestPartitions0(node, parts, d));
+ ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> requestPartitions0(node, parts, d));
}
});
}
// The special case for historical only rebalancing.
if (d.partitions().fullSet().isEmpty() && !d.partitions().historicalSet().isEmpty())
- ctx.kernalContext().closure().runLocalSafe(() -> requestPartitions0(node, parts, d));
+ ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> requestPartitions0(node, parts, d));
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 65e4b4d..b49dd73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -32,7 +32,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -119,6 +118,7 @@ import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -5174,7 +5174,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert newCrdFut != null;
- cctx.kernalContext().closure().callLocal(new Callable<Void>() {
+ cctx.kernalContext().closure().callLocal(new GridPlainCallable<Void>() {
@Override public Void call() throws Exception {
try {
newCrdFut.init(GridDhtPartitionsExchangeFuture.this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index e09f8a0..a9a8fb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -134,7 +135,7 @@ public class ExchangeLatchManager {
// Do not process from discovery thread.
// TODO: Should use queue to guarantee the order of processing left nodes.
- ctx.closure().runLocalSafe(() -> processNodeLeft(cache.version(), e.eventNode()));
+ ctx.closure().runLocalSafe((GridPlainRunnable)() -> processNodeLeft(cache.version(), e.eventNode()));
}, EVT_NODE_LEFT, EVT_NODE_FAILED);
ctx.event().addDiscoveryEventListener((e, cache) -> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f1f99d3..babdf80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridInClosure3;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
@@ -5159,7 +5160,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
if (proceed || (state() == MARKED_ROLLBACK)) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
// Note: if rollback asynchronously on timeout should not clear thread map
// since thread started tx still should be able to see this tx.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index d33aff2..7e91fbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.local;
import java.io.Externalizable;
import java.util.Collection;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -37,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -189,7 +189,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
- return ctx.closures().callLocalSafe(new Callable<Void>() {
+ return ctx.closures().callLocalSafe(new GridPlainCallable<Void>() {
@Override public Void call() throws Exception {
removeAll();
@@ -240,7 +240,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException {
- return ctx.closures().callLocalSafe(new Callable<Void>() {
+ return ctx.closures().callLocalSafe(new GridPlainCallable<Void>() {
@Override public Void call() throws Exception {
preloadPartition(part);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 982e296..81ee613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.resource.GridResourceIoc;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -286,7 +287,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
- return ctx.closures().callLocalSafe(new Callable<Void>() {
+ return ctx.closures().callLocalSafe(new GridPlainCallable<Void>() {
@Override public Void call() throws Exception {
removeAll();
@@ -350,7 +351,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final boolean storeEnabled = ctx.readThrough();
- return asyncOp(new Callable<Map<K, V>>() {
+ return asyncOp(new GridPlainCallable<Map<K, V>>() {
@Override public Map<K, V> call() throws Exception {
return getAllInternal(keys, storeEnabled, taskName, deserializeBinary, skipVals, needVer);
}
@@ -775,7 +776,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- return asyncOp(new Callable<Object>() {
+ return asyncOp(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(op,
keys,
@@ -817,7 +818,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- return asyncOp(new Callable<Object>() {
+ return asyncOp(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(DELETE,
keys,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 9fbf83c..da9aab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -654,7 +655,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
// Complete init future if local node is a new coordinator. All previous txs have been already completed here.
if (curCrd0.local())
- ctx.closure().runLocalSafe(initFut::onDone);
+ ctx.closure().runLocalSafe((GridPlainRunnable)initFut::onDone);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 0019df0..8f08dea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -22,13 +22,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -105,8 +105,8 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
cctx.deploymentEnabled());
// Process cancel query directly (without sending) for local node,
- cctx.closures().callLocalSafe(new Callable<Object>() {
- @Override public Object call() throws Exception {
+ cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
+ @Override public Object call() {
qryMgr.processQueryRequest(cctx.localNodeId(), req);
return null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 5474fc5..08fbfe8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
@@ -43,6 +42,7 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
@@ -894,7 +894,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
if (locNode != null) {
- cctx.closures().callLocalSafe(new Callable<Object>() {
+ cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
req.beforeLocalExecution(cctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ee0e182..1573e8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
@@ -1341,7 +1342,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
final UUID routineId,
final GridKernalContext ctx) {
if (t != null) {
- ctx.closure().runLocalSafe(new Runnable() {
+ ctx.closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
GridCacheContext<K, V> cctx = cacheContext(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a561b9b..fb3a892 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -101,6 +101,7 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -2184,7 +2185,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
scheduleDumpTask(
IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT,
() -> cctx.kernalContext().closure().runLocalSafe(
- () -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(longOpsDumpTimeout)),
+ (GridPlainRunnable)() -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(longOpsDumpTimeout)),
longOpsDumpTimeout);
}
@@ -3240,7 +3241,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/**
* Transactions recovery initialization runnable.
*/
- private final class TxRecoveryInitRunnable implements Runnable {
+ private final class TxRecoveryInitRunnable implements GridPlainRunnable {
/** */
private final ClusterNode node;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
index 39b79d4..a4ac865 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -174,7 +175,7 @@ public class PartitionCountersNeighborcastFuture extends GridCacheCompoundIdenti
MiniFuture mini = (MiniFuture)fut;
if (mini.nodeId.equals(nodeId)) {
- cctx.kernalContext().closure().runLocalSafe(mini::onDone);
+ cctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)mini::onDone);
return;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 1f2a107..7625368 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.util.GridTimerTask;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -240,7 +241,7 @@ public class ClusterProcessor extends GridProcessorAdapter implements Distribute
", previous value was " +
oldVal.tag();
- ctx.closure().runLocalSafe(() -> ctx.event().record(
+ ctx.closure().runLocalSafe((GridPlainRunnable)() -> ctx.event().record(
new ClusterTagUpdatedEvent(
ctx.discovery().localNode(),
msg,
@@ -278,7 +279,7 @@ public class ClusterProcessor extends GridProcessorAdapter implements Distribute
this.metastorage = metastorage;
ctx.closure().runLocalSafe(
- () -> {
+ (GridPlainRunnable)() -> {
try {
ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), cluster.tag());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 085d38c..a774d520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -1412,7 +1413,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
checkLocalNodeInBaseline(globalState.baselineTopology());
- ctx.closure().runLocalSafe(new Runnable() {
+ ctx.closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
boolean client = ctx.clientNode();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 5e31c41..293225a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -702,7 +703,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (ctx.config().isPeerClassLoadingEnabled()) {
// Peer class loading cannot be performed before a node joins, so we delay the deployment.
// Run the deployment task in the system pool to avoid blocking of the discovery thread.
- ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe(() -> {
+ ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe((GridPlainRunnable)() -> {
try {
hnd.p2pUnmarshal(srcNodeId, ctx);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 9069318..0e4f463 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -233,7 +234,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> t) {
- ctx.closure().runLocalSafe(new Runnable() {
+ ctx.closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
processRequest(nodeId, req);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a6d7182..7c24263 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -101,6 +101,7 @@ import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -331,7 +332,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Only async notification is possible since
// discovery thread may be trapped otherwise.
if (buf != null) {
- waitAffinityAndRun(new Runnable() {
+ waitAffinityAndRun(new GridPlainRunnable() {
@Override public void run() {
buf.onNodeLeft();
}
@@ -1068,7 +1069,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (bufMappings.remove(nodeId, buf)) {
final Buffer buf0 = buf;
- waitAffinityAndRun(new Runnable() {
+ waitAffinityAndRun(new GridPlainRunnable() {
@Override public void run() {
buf0.onNodeLeft();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 232f5fc..6a8c400 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -148,8 +149,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
// This may require cache operation to execute,
// therefore cannot use event notification thread.
ctx.closure().callLocalSafe(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
+ new GridPlainCallable<Object>() {
+ @Override public Object call() {
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
UUID leftNodeId = discoEvt.eventNode().id();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 663a3f7..bb8b5e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -304,7 +305,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
final MarshallerMappingItem item = msg.getMappingItem();
marshallerCtx.onMappingAccepted(item);
- closProc.runLocalSafe(new Runnable() {
+ closProc.runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs)
lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
index f648114..eec7c1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
@@ -232,7 +233,7 @@ public class PerformanceStatisticsProcessor extends GridProcessorAdapter {
/** Starts or stops collecting statistics on metastorage update. */
private void onMetastorageUpdate(boolean start) {
- ctx.closure().runLocalSafe(() -> {
+ ctx.closure().runLocalSafe((GridPlainRunnable)() -> {
if (start)
startWriter();
else
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 22e9d51..c45f77e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.IgniteClosure2X;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
@@ -218,7 +219,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
if (val == null)
throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val"));
- return ctx.closure().callLocalSafe(new Callable<Object>() {
+ return ctx.closure().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
EntryProcessorResult<Boolean> res = cache.invoke(key, new EntryProcessor<Object, Object, Boolean>() {
@Override public Boolean process(MutableEntry<Object, Object> entry,
@@ -1729,7 +1730,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
GridKernalContext ctx) {
assert c != null;
- return ctx.closure().callLocalSafe(new Callable<Object>() {
+ return ctx.closure().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
EntryProcessorResult<Boolean> res = c.invoke(key, new EntryProcessor<Object, Object, Boolean>() {
@Override public Boolean process(MutableEntry<Object, Object> entry,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java
index df4aabc..fb87c12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.rest.handlers.datastructures;
import java.util.Collection;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
@@ -29,6 +28,7 @@ import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandle
import org.apache.ignite.internal.processors.rest.request.DataStructuresRequest;
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -97,8 +97,8 @@ public class DataStructuresCommandHandler extends GridRestCommandHandlerAdapter
return new GridFinishedFuture(err);
}
- return ctx.closure().callLocalSafe(new Callable<Object>() {
- @Override public Object call() throws Exception {
+ return ctx.closure().callLocalSafe(new GridPlainCallable<Object>() {
+ @Override public Object call() {
Long init = req.initial();
Long delta = req.delta();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index 384ba35..cdb92dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -47,6 +46,7 @@ import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandle
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
import org.apache.ignite.internal.processors.rest.request.RestQueryRequest;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -261,7 +261,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/**
* Execute query callable.
*/
- private static class ExecuteQueryCallable implements Callable<GridRestResponse> {
+ private static class ExecuteQueryCallable implements GridPlainCallable<GridRestResponse> {
/** Kernal context. */
private GridKernalContext ctx;
@@ -401,7 +401,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/**
* Close query callable.
*/
- private static class CloseQueryCallable implements Callable<GridRestResponse> {
+ private static class CloseQueryCallable implements GridPlainCallable<GridRestResponse> {
/** Current queries cursors. */
private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
@@ -452,7 +452,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/**
* Fetch query callable.
*/
- private static class FetchQueryCallable implements Callable<GridRestResponse> {
+ private static class FetchQueryCallable implements GridPlainCallable<GridRestResponse> {
/** Current queries cursors. */
private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index c0f624f..4836991 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -257,7 +258,7 @@ public class GridServiceProcessor extends ServiceProcessorAdapter implements Ign
else { // Listener for client nodes is registered in onContinuousProcessorStarted method.
assert !ctx.isDaemon();
- ctx.closure().runLocalSafe(new Runnable() {
+ ctx.closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
try {
Iterable<CacheEntryEvent<?, ?>> entries =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
index 7778dc2..29cd6ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -457,7 +458,7 @@ class ServiceDeploymentTask {
if (isCompleted())
return;
- ctx.closure().runLocalSafe(() -> {
+ ctx.closure().runLocalSafe((GridPlainRunnable)() -> {
try {
ServiceDeploymentActions depResults = msg.servicesDeploymentActions();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 1b74977..539676c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.AffinityTask;
import org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -1005,7 +1006,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
if (waitForAffTop && affFut != null) {
affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut0) {
- ctx.closure().runLocalSafe(new Runnable() {
+ ctx.closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
onResponse(failoverRes);
}
@@ -1024,7 +1025,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
private void sendRetryRequest(final long waitms, final GridJobResultImpl jRes, final GridJobExecuteResponse resp) {
ctx.timeout().schedule(new Runnable() {
@Override public void run() {
- ctx.closure().runLocalSafe(new Runnable() {
+ ctx.closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
try {
ClusterNode newNode = ctx.affinity().mapPartitionToNode(affCacheName, affPartId,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
index 7979d87..9e19fb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -360,7 +361,7 @@ public class VisorQueryUtils {
final VisorQueryTaskArg arg,
final GridQueryCancel cancel
) {
- ignite.context().closure().runLocalSafe(() -> {
+ ignite.context().closure().runLocalSafe((GridPlainRunnable)() -> {
try {
SqlFieldsQuery qry = new SqlFieldsQuery(arg.getQueryText());
@@ -440,7 +441,7 @@ public class VisorQueryUtils {
final VisorQueryHolder holder,
final VisorScanQueryTaskArg arg
) {
- ignite.context().closure().runLocalSafe(() -> {
+ ignite.context().closure().runLocalSafe((GridPlainRunnable)() -> {
try {
IgniteCache<Object, Object> c = ignite.cache(arg.getCacheName());
String filterText = arg.getFilter();
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PLocalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PLocalDeploymentSelfTest.java
index 6b2dbd4..4459e47 100644
--- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PLocalDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PLocalDeploymentSelfTest.java
@@ -257,7 +257,7 @@ public class GridP2PLocalDeploymentSelfTest extends GridCommonAbstractTest {
@Override public void run() {
stop.set(true);
}
- }, 10, TimeUnit.SECONDS);
+ }, 5, TimeUnit.SECONDS);
fut.get();
} finally {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 7d0d4ee..4e7b163 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -74,6 +73,7 @@ import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanType;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -237,32 +237,32 @@ public class GridMapQueryExecutor {
Span span = MTC.span();
ctx.closure().callLocal(
- (Callable<Void>)() -> {
+ (GridPlainCallable<Void>)() -> {
try (TraceSurroundings ignored = MTC.supportContinual(span)) {
- onQueryRequest0(node,
- req.requestId(),
- segment,
- req.schemaName(),
- req.queries(),
- cacheIds,
- req.topologyVersion(),
- partsMap,
- parts,
- req.pageSize(),
- distributedJoins,
- enforceJoinOrder,
- false,
- timeout,
- params,
- lazy,
- req.mvccSnapshot(),
- dataPageScanEnabled,
- treatReplicatedAsPartitioned
- );
+ onQueryRequest0(node,
+ req.requestId(),
+ segment,
+ req.schemaName(),
+ req.queries(),
+ cacheIds,
+ req.topologyVersion(),
+ partsMap,
+ parts,
+ req.pageSize(),
+ distributedJoins,
+ enforceJoinOrder,
+ false,
+ timeout,
+ params,
+ lazy,
+ req.mvccSnapshot(),
+ dataPageScanEnabled,
+ treatReplicatedAsPartitioned
+ );
- return null;
- }
- },
+ return null;
+ }
+ },
QUERY_POOL);
}