You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/17 20:54:50 UTC
[1/7] ignite git commit: IGNITE-1498 Added support for -cfg=path
command line option.
Repository: ignite
Updated Branches:
refs/heads/ignite-1171 6aa0ee16c -> 8493576c1
IGNITE-1498 Added support for -cfg=path command line option.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd6a1d51
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd6a1d51
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd6a1d51
Branch: refs/heads/ignite-1171
Commit: cd6a1d513adab5a24b33a9315bfd083c2f4383ee
Parents: 6187241
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Thu Sep 17 14:33:00 2015 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Sep 17 14:33:00 2015 +0700
----------------------------------------------------------------------
.../ignite/visor/commands/VisorConsole.scala | 31 +++++++++++++-------
1 file changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd6a1d51/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
index bcfc6e0..627f795 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
@@ -84,13 +84,14 @@ class VisorConsole {
if (hasArgFlag("?", argLst) || hasArgFlag("help", argLst)) {
println("Usage:")
- println(s" $progName [?]|[{-v}{-np}]|[{-b=<batch commands file path>} {-e=command1;command2}]")
+ println(s" $progName [? | -help]|[{-v}{-np} {-cfg=<path>}]|[{-b=<path>} {-e=command1;command2;...}]")
println(" Where:")
- println(" ?, /help, -help - show this message.")
- println(" -v - verbose mode (quiet by default).")
- println(" -np - no pause on exit (pause by default)")
- println(" -b - batch mode with file)")
- println(" -e - batch mode with commands)")
+ println(" ?, /help, -help - show this message.")
+ println(" -v - verbose mode (quiet by default).")
+ println(" -np - no pause on exit (pause by default).")
+ println(" -cfg=<path> - connect with specified configuration.")
+ println(" -b=<path> - batch mode with file.")
+ println(" -e==cmd1;cmd2;... - batch mode with commands.")
visor.quit()
}
@@ -99,14 +100,22 @@ class VisorConsole {
}
protected def buildReader(argLst: ArgList) = {
+ val cfgFile = argValue("cfg", argLst)
val batchFile = argValue("b", argLst)
val batchCommand = argValue("e", argLst)
+ cfgFile.foreach(cfg => {
+ if (batchFile.isDefined || batchCommand.isDefined) {
+ visor.warn("Options can't contains both -cfg and one of -b or -e options.")
+
+ visor.quit()
+ }
+
+ visor.searchCmd("open").foreach(_.withArgs("-cpath=" + cfg))
+ })
+
if (batchFile.isDefined && batchCommand.isDefined) {
- visor.warn(
- "Illegal options can't contains both command file and commands",
- s"Usage: $progName {-b=<batch commands file path>} {-e=command1;command2}"
- )
+ visor.warn("Options can't contains both command file and commands.")
visor.quit()
}
@@ -303,7 +312,7 @@ object VisorConsole extends VisorConsole with App {
addCommands()
private val argLst = parse(args.mkString(" "))
-
+
private val reader = buildReader(argLst)
visor.reader(reader)
[6/7] ignite git commit: minor
Posted by ag...@apache.org.
minor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d64fc9d1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d64fc9d1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d64fc9d1
Branch: refs/heads/ignite-1171
Commit: d64fc9d105c66c08234d7bdf72046128456620a5
Parents: 3676cbe
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Sep 17 18:03:57 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Sep 17 18:03:57 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d64fc9d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index d9f6840..0cbad48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -418,4 +418,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateFuture.class, this);
}
-}
\ No newline at end of file
+}
[3/7] ignite git commit: ignite-1.4 Fixed Visor cmd options.
Posted by ag...@apache.org.
ignite-1.4 Fixed Visor cmd options.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8c0b308
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8c0b308
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8c0b308
Branch: refs/heads/ignite-1171
Commit: b8c0b308a7f02f0495315e280936f6bacd170e44
Parents: 5cfb6e6
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Thu Sep 17 17:17:05 2015 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Sep 17 17:17:05 2015 +0700
----------------------------------------------------------------------
.../scala/org/apache/ignite/visor/commands/VisorConsole.scala | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8c0b308/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
index 2abe8a7..6d91b05 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
@@ -105,6 +105,12 @@ class VisorConsole {
val batchCommand = argValue("e", argLst)
cfgFile.foreach(cfg => {
+ if (cfg.trim.isEmpty) {
+ visor.warn("Expected path to configuration after \"-cfg\" option.")
+
+ visor.quit()
+ }
+
if (batchFile.isDefined || batchCommand.isDefined) {
visor.warn("Options can't contains both -cfg and one of -b or -e options.")
[2/7] ignite git commit: ignite-1.4 Fixed Visor cmd options.
Posted by ag...@apache.org.
ignite-1.4 Fixed Visor cmd options.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5cfb6e68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5cfb6e68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5cfb6e68
Branch: refs/heads/ignite-1171
Commit: 5cfb6e6878dea2fa78d7593766035a5b535763a1
Parents: cd6a1d5
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Thu Sep 17 16:24:25 2015 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Sep 17 16:24:25 2015 +0700
----------------------------------------------------------------------
.../main/scala/org/apache/ignite/visor/commands/VisorConsole.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cfb6e68/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
index 627f795..2abe8a7 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
@@ -91,7 +91,7 @@ class VisorConsole {
println(" -np - no pause on exit (pause by default).")
println(" -cfg=<path> - connect with specified configuration.")
println(" -b=<path> - batch mode with file.")
- println(" -e==cmd1;cmd2;... - batch mode with commands.")
+ println(" -e=cmd1;cmd2;... - batch mode with commands.")
visor.quit()
}
[7/7] ignite git commit: Merge branch 'ignite-1.4' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-1171
Posted by ag...@apache.org.
Merge branch 'ignite-1.4' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-1171
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8493576c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8493576c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8493576c
Branch: refs/heads/ignite-1171
Commit: 8493576c1560a6a2884ff4046b20df155356a710
Parents: 6aa0ee1 d64fc9d
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Sep 17 11:54:33 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Sep 17 11:54:33 2015 -0700
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 7 -
.../processors/cache/GridCacheContext.java | 6 +-
.../cache/GridCacheEvictionManager.java | 6 +-
.../cache/GridCacheEvictionResponse.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 47 ++++---
.../processors/cache/GridCacheMessage.java | 7 +
.../processors/cache/GridCacheMvccManager.java | 34 ++++-
.../GridCachePartitionExchangeManager.java | 41 ++++--
.../processors/cache/GridCacheProcessor.java | 28 ++--
.../GridDistributedLockResponse.java | 6 +-
.../GridDistributedTxPrepareResponse.java | 6 +-
.../distributed/dht/GridDhtTopologyFuture.java | 6 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 16 ++-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +
.../atomic/GridNearAtomicUpdateResponse.java | 11 +-
.../colocated/GridDhtColocatedLockFuture.java | 44 ++++--
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +-
.../GridDhtPartitionsExchangeFuture.java | 19 ++-
.../distributed/near/GridNearGetResponse.java | 6 +-
.../distributed/near/GridNearLockFuture.java | 26 +++-
.../near/GridNearOptimisticTxPrepareFuture.java | 20 ++-
.../near/GridNearTxFinishResponse.java | 6 +-
.../cache/query/GridCacheQueryResponse.java | 6 +-
.../continuous/CacheContinuousQueryHandler.java | 12 +-
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../ignite/internal/util/GridSpinBusyLock.java | 10 ++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +++-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../ignite/testframework/GridTestUtils.java | 14 +-
.../cache/CacheIndexStreamerTest.java | 137 +++++++++++++++++++
.../IgniteCacheQueryNodeRestartSelfTest2.java | 2 -
.../ignite/visor/commands/VisorConsole.scala | 37 +++--
36 files changed, 469 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8493576c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
[4/7] ignite git commit: ignite-1452 Cancel cache operations on node
stop
Posted by ag...@apache.org.
ignite-1452 Cancel cache operations on node stop
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/585761f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/585761f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/585761f2
Branch: refs/heads/ignite-1171
Commit: 585761f28e8b70487eaf2198d6ea39f7232b088d
Parents: b8c0b30
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 17 16:26:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 17 16:26:02 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 7 ---
.../processors/cache/GridCacheContext.java | 6 +--
.../cache/GridCacheEvictionManager.java | 6 +--
.../cache/GridCacheEvictionResponse.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 47 +++++++++++++-------
.../processors/cache/GridCacheMessage.java | 7 +++
.../processors/cache/GridCacheMvccManager.java | 34 +++++++++++---
.../GridCachePartitionExchangeManager.java | 41 +++++++++++++----
.../processors/cache/GridCacheProcessor.java | 28 ++++++++----
.../GridDistributedLockResponse.java | 6 +--
.../GridDistributedTxPrepareResponse.java | 6 +--
.../distributed/dht/GridDhtTopologyFuture.java | 6 ++-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +++--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 16 ++++---
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +
.../atomic/GridNearAtomicUpdateResponse.java | 11 ++---
.../colocated/GridDhtColocatedLockFuture.java | 44 ++++++++++++++----
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +--
.../GridDhtPartitionsExchangeFuture.java | 19 ++++++--
.../distributed/near/GridNearGetResponse.java | 6 +--
.../distributed/near/GridNearLockFuture.java | 26 ++++++++---
.../near/GridNearOptimisticTxPrepareFuture.java | 20 +++++++--
.../near/GridNearTxFinishResponse.java | 6 +--
.../cache/query/GridCacheQueryResponse.java | 6 +--
.../continuous/CacheContinuousQueryHandler.java | 12 +++--
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../ignite/internal/util/GridSpinBusyLock.java | 10 +++++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +++++++---
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../IgniteCacheQueryNodeRestartSelfTest2.java | 2 -
32 files changed, 292 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index daf7d23..82db059 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1806,8 +1806,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
notifyLifecycleBeansEx(LifecycleEventType.BEFORE_NODE_STOP);
}
- GridCacheProcessor cacheProcessor = ctx.cache();
-
List<GridComponent> comps = ctx.components();
ctx.marshallerContext().onKernalStop();
@@ -1856,11 +1854,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
// Note that interrupted flag is cleared.
interrupted = true;
}
- finally {
- // Cleanup even on successful acquire.
- if (cacheProcessor != null)
- cacheProcessor.cancelUserOperations();
- }
}
if (interrupted)
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 86ba3e6..5385dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -283,12 +283,12 @@ public class GridCacheContext<K, V> implements Externalizable {
GridCacheEvictionManager evictMgr,
GridCacheQueryManager<K, V> qryMgr,
CacheContinuousQueryManager contQryMgr,
- GridCacheAffinityManager affMgr,
CacheDataStructuresManager dataStructuresMgr,
GridCacheTtlManager ttlMgr,
GridCacheDrManager drMgr,
CacheConflictResolutionManager<K, V> rslvrMgr,
- CachePluginManager pluginMgr
+ CachePluginManager pluginMgr,
+ GridCacheAffinityManager affMgr
) {
assert ctx != null;
assert sharedCtx != null;
@@ -323,12 +323,12 @@ public class GridCacheContext<K, V> implements Externalizable {
this.evictMgr = add(evictMgr);
this.qryMgr = add(qryMgr);
this.contQryMgr = add(contQryMgr);
- this.affMgr = add(affMgr);
this.dataStructuresMgr = add(dataStructuresMgr);
this.ttlMgr = add(ttlMgr);
this.drMgr = add(drMgr);
this.rslvrMgr = add(rslvrMgr);
this.pluginMgr = add(pluginMgr);
+ this.affMgr = add(affMgr);
log = ctx.log(getClass());
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 3e0e2f9..1c34c76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1943,7 +1943,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
lock.readLock().unlock();
}
- if (res.error())
+ if (res.evictError())
// Complete future, since there was a class loading error on at least one node.
complete(false);
else
@@ -1985,14 +1985,14 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
boolean err = F.forAny(resMap.values(), new P1<GridCacheEvictionResponse>() {
@Override public boolean apply(GridCacheEvictionResponse res) {
- return res.error();
+ return res.evictError();
}
});
if (err) {
Collection<UUID> ids = F.view(resMap.keySet(), new P1<UUID>() {
@Override public boolean apply(UUID e) {
- return resMap.get(e).error();
+ return resMap.get(e).evictError();
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
index 4d40c8d..aa3911b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
@@ -116,7 +116,7 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
/**
* @return {@code True} if request processing has finished with error.
*/
- boolean error() {
+ boolean evictError() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b55c84d..421ec82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -182,8 +182,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass()));
if (c == null) {
- U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
- ", nodeId=" + nodeId + ']');
+ if (cctx.kernalContext().isStopping()) {
+ if (log.isDebugEnabled())
+ log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg +
+ ", nodeId=" + nodeId + ']');
+ }
+ else {
+ U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
+ ", nodeId=" + nodeId + ']');
+ }
return;
}
@@ -596,9 +603,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*
* @param msg Message to send.
* @param destNodeId Destination node ID.
+ * @return {@code True} if should send message.
* @throws IgniteCheckedException If failed.
*/
- private void onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
+ private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
+ if (msg.error() != null && cctx.kernalContext().isStopping())
+ return false;
+
if (msg.messageId() < 0)
// Generate and set message ID.
msg.messageId(idGen.incrementAndGet());
@@ -609,6 +620,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled && msg instanceof GridCacheDeployable)
cctx.deploy().prepare((GridCacheDeployable)msg);
}
+
+ return true;
}
/**
@@ -624,7 +637,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
assert !node.isLocal();
- onSend(msg, node.id());
+ if (!onSend(msg, node.id()))
+ return;
if (log.isDebugEnabled())
log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
@@ -663,12 +677,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param msg Message to send.
* @param plc IO policy.
* @param fallback Callback for failed nodes.
- * @return {@code True} if nodes are empty or message was sent, {@code false} if
- * all nodes have left topology while sending this message.
* @throws IgniteCheckedException If send failed.
*/
@SuppressWarnings({"BusyWait", "unchecked"})
- public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
+ public void safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
@Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException {
assert nodes != null;
assert msg != null;
@@ -677,10 +689,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Message will not be sent as collection of nodes is empty: " + msg);
- return true;
+ return;
}
- onSend(msg, null);
+ if (!onSend(msg, null))
+ return;
if (log.isDebugEnabled())
log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
@@ -709,7 +722,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (fallback != null && !fallback.apply(n))
// If fallback signalled to stop.
- return false;
+ return;
added = true;
}
@@ -721,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug("Message will not be sent because all nodes left topology [msg=" + msg +
", nodes=" + U.toShortString(nodes) + ']');
- return false;
+ return;
}
}
@@ -737,7 +750,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (fallback != null && !fallback.apply(n))
// If fallback signalled to stop.
- return false;
+ return;
added = true;
}
@@ -757,7 +770,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" +
U.toShortString(nodes) + ']');
- return false;
+ return;
}
if (log.isDebugEnabled())
@@ -768,8 +781,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
-
- return true;
}
/**
@@ -800,7 +811,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*/
public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc,
long timeout) throws IgniteCheckedException {
- onSend(msg, node.id());
+ if (!onSend(msg, node.id()))
+ return;
int cnt = 0;
@@ -854,7 +866,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
assert node != null;
assert msg != null;
- onSend(msg, null);
+ if (!onSend(msg, null))
+ return;
try {
cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 4e737a0..55688e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -77,6 +77,13 @@ public abstract class GridCacheMessage implements Message {
protected int cacheId;
/**
+ * @return Error, if any.
+ */
+ @Nullable public Throwable error() {
+ return null;
+ }
+
+ /**
* Gets next ID for indexed message ID.
*
* @return Message ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 555bbda..e2d0302 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -120,6 +120,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
private IgniteLogger exchLog;
+ /** */
+ private volatile boolean stopping;
+
/** Lock callback. */
@GridToStringExclude
private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -325,8 +328,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* Cancels all client futures.
*/
- public void cancelClientFutures() {
- cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+ public void onStop() {
+ stopping = true;
+
+ cancelClientFutures(stopError());
}
/** {@inheritDoc} */
@@ -362,6 +367,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Node stop exception.
+ */
+ private IgniteCheckedException stopError() {
+ return new IgniteCheckedException("Operation has been cancelled (node is stopping).");
+ }
+
+ /**
* @param from From version.
* @return To version.
*/
@@ -385,8 +397,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
- if (cctx.kernalContext().clientDisconnected())
- ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+ onFutureAdded(fut);
}
/**
@@ -507,17 +518,26 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
fut.onNodeLeft(n.id());
}
- if (cctx.kernalContext().clientDisconnected())
- ((GridFutureAdapter)fut).onDone(disconnectedError(null));
-
// Just in case if future was completed before it was added.
if (fut.isDone())
removeFuture(fut);
+ else
+ onFutureAdded(fut);
return true;
}
/**
+ * @param fut Future.
+ */
+ private void onFutureAdded(IgniteInternalFuture<?> fut) {
+ if (stopping)
+ ((GridFutureAdapter)fut).onDone(stopError());
+ else if (cctx.kernalContext().clientDisconnected())
+ ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+ }
+
+ /**
* @param fut Future to remove.
* @return {@code True} if removed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 20340d1..34c571c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -147,6 +147,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
+ /** */
+ private volatile IgniteCheckedException stopErr;
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -381,7 +384,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
- IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ?
+ stopErr = cctx.kernalContext().clientDisconnected() ?
new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
"Client node disconnected: " + cctx.gridName()) :
new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
@@ -391,11 +394,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (exchFuts0 != null) {
for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
- f.onDone(err);
+ f.onDone(stopErr);
}
for (AffinityReadyFuture f : readyFuts.values())
- f.onDone(err);
+ f.onDone(stopErr);
+
+ for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts)
+ f.onDone(stopErr);
+
+ if (locExchFut != null)
+ locExchFut.onDone(stopErr);
U.cancel(exchWorker);
@@ -519,6 +528,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
fut.onDone(topVer);
}
+ else if (stopErr != null)
+ fut.onDone(stopErr);
return fut;
}
@@ -791,6 +802,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (discoEvt != null)
fut.onEvent(exchId, discoEvt);
+ if (stopErr != null)
+ fut.onDone(stopErr);
+
return fut;
}
@@ -799,12 +813,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param err Error.
*/
public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) {
- if (err == null) {
- AffinityTopologyVersion topVer = exchFut.topologyVersion();
+ AffinityTopologyVersion topVer = exchFut.topologyVersion();
- if (log.isDebugEnabled())
- log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ']');
+ if (log.isDebugEnabled())
+ log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+ if (err == null) {
while (true) {
AffinityTopologyVersion readyVer = readyTopVer.get();
@@ -825,8 +839,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
}
- else if (log.isDebugEnabled())
- log.debug("Exchange done with error [fut=" + exchFut + ", err=" + err + ']');
+ else {
+ for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+ if (entry.getKey().compareTo(topVer) <= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Completing created topology ready future with error " +
+ "[ver=" + topVer + ", fut=" + entry.getValue() + ']');
+
+ entry.getValue().onDone(err);
+ }
+ }
+ }
ExchangeFutureSet exchFuts0 = exchFuts;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4ae0baa..c92de7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -960,6 +960,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
+ cancelFutures();
+
List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
@@ -1323,12 +1325,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
evictMgr,
qryMgr,
contQryMgr,
- affMgr,
dataStructuresMgr,
ttlMgr,
drMgr,
rslvrMgr,
- pluginMgr
+ pluginMgr,
+ affMgr
);
cacheCtx.cacheObjectContext(cacheObjCtx);
@@ -1452,12 +1454,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
evictMgr,
qryMgr,
contQryMgr,
- affMgr,
dataStructuresMgr,
ttlMgr,
drMgr,
rslvrMgr,
- pluginMgr
+ pluginMgr,
+ affMgr
);
cacheCtx.cacheObjectContext(cacheObjCtx);
@@ -2325,9 +2327,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
- if (ctx.clientDisconnected())
+ if (ctx.isStopping()) {
+ err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+ "node is stopping.");
+ }
+ else if (ctx.clientDisconnected()) {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
+ }
}
catch (IgniteCheckedException e) {
err = e;
@@ -3036,9 +3043,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
- if (ctx.clientDisconnected())
+ if (ctx.isStopping()) {
+ err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+ "node is stopping.");
+ }
+ else if (ctx.clientDisconnected()) {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
+ }
}
catch (IgniteCheckedException e) {
err = e;
@@ -3104,8 +3116,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* Cancel all user operations.
*/
- public void cancelUserOperations() {
- sharedCtx.mvcc().cancelClientFutures();
+ private void cancelFutures() {
+ sharedCtx.mvcc().onStop();
Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index cdb878d..8a95b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -137,10 +137,8 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
return futId;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 4264830..e798458 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -67,10 +67,8 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
this.err = err;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
index c11a3d7..6ade26f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.jetbrains.annotations.Nullable;
/**
* Future that implements a barrier after which dht topology is safe to use. Topology is considered to be
@@ -38,9 +39,10 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo
public AffinityTopologyVersion topologyVersion();
/**
- * Returns is cache topology valid.
+ * Returns error is cache topology is not valid.
+ *
* @param cctx Cache context.
* @return valid ot not.
*/
- public boolean isCacheTopologyValid(GridCacheContext cctx);
+ @Nullable public Throwable validateCache(GridCacheContext cctx);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b9514a9..1a869e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1217,7 +1217,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
Throwable err = res.error();
// Log error before sending reply.
- if (err != null && !(err instanceof GridCacheLockTimeoutException))
+ if (err != null && !(err instanceof GridCacheLockTimeoutException) && !ctx.kernalContext().isStopping())
U.error(log, "Failed to acquire lock for request: " + req, err);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 33651bc..04d36e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -97,16 +97,15 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* Sets update error.
- * @param err
+ *
+ * @param err Error.
*/
public void onError(IgniteCheckedException err){
this.err = err;
}
- /**
- * @return Gets update error.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
@@ -154,8 +153,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
nearEvicted.add(key);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d93f68f..fb2c5ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -385,9 +385,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -811,6 +812,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
Exception err = null;
+ GridNearAtomicUpdateRequest singleReq0 = null;
Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
int size = keys.size();
@@ -837,13 +839,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (size == 1 && !fastMap) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq = mapSingleUpdate();
+ singleReq0 = singleReq = mapSingleUpdate();
}
else {
pendingMappings = mapUpdate(topNodes);
if (pendingMappings.size() == 1)
- singleReq = F.firstValue(pendingMappings);
+ singleReq0 = singleReq = F.firstValue(pendingMappings);
else {
if (syncMode == PRIMARY_SYNC) {
mappings = U.newHashMap(pendingMappings.size());
@@ -874,8 +876,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
// Optimize mapping for single key.
- if (singleReq != null)
- mapSingle(singleReq.nodeId(), singleReq);
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
else {
assert pendingMappings != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 5f5fbb5..ccb67d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -198,6 +198,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
boolean skipStore,
boolean clientReq
) {
+ assert futVer != null;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 8bc145c..376f4ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -116,6 +116,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param futVer Future version.
*/
public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+ assert futVer != null;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
@@ -149,16 +151,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* Sets update error.
- * @param err
+ *
+ * @param err Error.
*/
public void error(IgniteCheckedException err){
this.err = err;
}
- /**
- * @return Update error, if any.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 596ec77..1a08265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -524,7 +525,22 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtColocatedLockFuture.class, this, "inTx", inTx(), "super", super.toString());
+ Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ if (isMini(f)) {
+ MiniFuture m = (MiniFuture)f;
+
+ return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ }
+ else
+ return "[loc=true, done=" + f.isDone() + "]";
+ }
+ });
+
+ return S.toString(GridDhtColocatedLockFuture.class, this,
+ "innerFuts", futs,
+ "inTx", inTx(),
+ "super", super.toString());
}
/**
@@ -565,9 +581,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
if (fut.topologyVersion().equals(topVer)){
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -612,9 +629,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -643,10 +661,15 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
try {
+ fut.get();
+
mapOnTopology(remap, c);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
@@ -1327,8 +1350,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
+ fut.get();
+
remap();
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 36a2da1..eaed424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -283,7 +283,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
fut.onResult((ClusterTopologyCheckedException)e);
- else
+ else if (!cctx.kernalContext().isStopping())
fut.onResult(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index d31f096..93e39ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -98,10 +98,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
this.err = err;
}
- /**
- * @return Error, if any.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 865bbdc..a1b03c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1081,9 +1081,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/** {@inheritDoc} */
- @Override public boolean isCacheTopologyValid(GridCacheContext cctx) {
- return cctx.config().getTopologyValidator() != null && cacheValidRes.containsKey(cctx.cacheId()) ?
- cacheValidRes.get(cctx.cacheId()) : true;
+ @Override public Throwable validateCache(GridCacheContext cctx) {
+ Throwable err = error();
+
+ if (err != null)
+ return err;
+
+ if (cctx.config().getTopologyValidator() != null) {
+ Boolean res = cacheValidRes.get(cctx.cacheId());
+
+ if (res != null && !res) {
+ return new IgniteCheckedException("Failed to perform cache operation " +
+ "(cache topology is not valid): " + cctx.name());
+ }
+ }
+
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 3276377..d4493a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -163,10 +163,8 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
return topVer != null ? topVer : super.topologyVersion();
}
- /**
- * @return Error.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index f3e5ca3..dcc8da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -703,9 +703,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
if (fut.topologyVersion().equals(topVer)){
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -749,9 +750,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -777,10 +779,15 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
try {
+ fut.get();
+
mapOnTopology(remap);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
@@ -1435,8 +1442,13 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
+ fut.get();
+
remap();
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2048fdf..25028c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -319,7 +319,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
assert ctx != null : cacheId;
- if (!topFut.isCacheTopologyValid(ctx)) {
+ Throwable err = topFut.validateCache(ctx);
+
+ if (err != null) {
if (invalidCaches != null)
invalidCaches.append(", ");
else
@@ -343,12 +345,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
else {
topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
try {
+ fut.get();
+
prepareOnTopology(remap, c);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.txContextReset();
}
@@ -841,7 +848,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
if (affFut != null && !affFut.isDone()) {
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- remap();
+ try {
+ fut.get();
+
+ remap();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index cec7d73..c860baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -75,10 +75,8 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
this.err = err;
}
- /**
- * @return Error.
- */
- @Nullable public Throwable error() {
+ /** {@inheritDoc} */
+ @Nullable @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 3e4cdeb..78e2ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -193,10 +193,8 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
return reqId;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index df6b4b7..c99e07f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -97,6 +98,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** Whether to skip primary check for REPLICATED cache. */
private transient boolean skipPrimaryCheck;
+ /** */
+ private transient int cacheId;
+
/**
* Required by {@link Externalizable}.
*/
@@ -145,6 +149,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.ignoreExpired = ignoreExpired;
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
+
+ cacheId = CU.cacheId(cacheName);
}
/** {@inheritDoc} */
@@ -457,6 +463,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
sync = in.readBoolean();
ignoreExpired = in.readBoolean();
taskHash = in.readInt();
+
+ cacheId = CU.cacheId(cacheName);
}
/**
@@ -466,9 +474,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
assert ctx != null;
- GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);
-
- return cache == null ? null : cache.context();
+ return ctx.cache().<K, V>context().cacheContext(cacheId);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 00b91dd..6ca1f72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1105,6 +1105,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Commits transaction to transaction manager. Used for one-phase commit transactions only.
+ *
+ * @param commit If {@code true} commits transaction, otherwise rollbacks.
*/
public void tmFinish(boolean commit) {
assert onePhaseCommit();
@@ -1118,7 +1120,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
state(commit ? COMMITTED : ROLLED_BACK);
- boolean needsCompletedVersions = needsCompletedVersions();
+ boolean needsCompletedVersions = commit && needsCompletedVersions();
assert !needsCompletedVersions || completedBase != null;
assert !needsCompletedVersions || committedVers != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
index 2aae6ef..6bfd4fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
/**
@@ -76,6 +77,15 @@ public class GridSpinBusyLock {
}
/**
+ * @param millis Timeout.
+ * @return {@code True} if lock was acquired.
+ * @throws InterruptedException If interrupted.
+ */
+ public boolean tryBlock(long millis) throws InterruptedException {
+ return lock.tryWriteLock(millis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
* Makes possible for activities entering busy state again.
*/
public void unblock() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 6b4d473..151167a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -184,20 +185,29 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
String val = "value-" + k;
- cache.invoke(key, new Processor(val));
+ procs.put(key, new Processor(val));
}
- cache.invokeAll(procs);
+ Map<String, EntryProcessorResult<Integer>> resMap = cache.invokeAll(procs);
+
+ for (String key : procs.keySet()) {
+ EntryProcessorResult<Integer> res = resMap.get(key);
+
+ assertNotNull(res);
+ assertEquals(k + 1, (Object) res.get());
+ }
}
else {
+ IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
for (int i = 0; i < NUM_SETS; i++) {
String key = "set-" + i;
String val = "value-" + k;
- IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+ Integer valsCnt = cache.invoke(key, new Processor(val));
- cache.invoke(key, new Processor(val));
+ assertEquals(k + 1, (Object)valsCnt);
}
}
}
@@ -275,7 +285,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
/** */
- private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+ private static class Processor implements EntryProcessor<String, Set<String>, Integer>, Serializable {
/** */
private String val;
@@ -287,7 +297,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
/** {@inheritDoc} */
- @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+ @Override public Integer process(MutableEntry<String, Set<String>> e, Object... args) {
Set<String> vals = e.getValue();
if (vals == null)
@@ -297,7 +307,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
e.setValue(vals);
- return null;
+ return vals.size();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 7aae48c..88605b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -79,12 +79,12 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new GridCacheEvictionManager(),
new GridCacheLocalQueryManager<K, V>(),
new CacheContinuousQueryManager(),
- new GridCacheAffinityManager(),
new CacheDataStructuresManager(),
new GridCacheTtlManager(),
new GridOsCacheDrManager(),
new CacheOsConflictResolutionManager<K, V>(),
- new CachePluginManager(ctx, new CacheConfiguration())
+ new CachePluginManager(ctx, new CacheConfiguration()),
+ new GridCacheAffinityManager()
);
store().initialize(null, new IdentityHashMap<CacheStore, ThreadLocal>());
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 1276405..e00611b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -185,8 +185,6 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
* @throws Exception If failed.
*/
public void testRestarts() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1452");
-
int duration = 90 * 1000;
int qryThreadNum = 4;
int restartThreadsNum = 2; // 4 + 2 = 6 nodes
[5/7] ignite git commit: Test for ignite-973.
Posted by ag...@apache.org.
Test for ignite-973.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3676cbe7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3676cbe7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3676cbe7
Branch: refs/heads/ignite-1171
Commit: 3676cbe7f5f5f73199487318d6841e50a1f73496
Parents: 585761f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 17 17:52:24 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 17 17:52:24 2015 +0300
----------------------------------------------------------------------
.../ignite/testframework/GridTestUtils.java | 14 +-
.../cache/CacheIndexStreamerTest.java | 137 +++++++++++++++++++
2 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3676cbe7/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index edf7c52..be3f0e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -633,11 +633,23 @@ public final class GridTestUtils {
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
public static <T> IgniteInternalFuture<T> runAsync(final Callable<T> task) {
+ return runAsync(task, "async-runner");
+ }
+
+ /**
+ * Runs callable task asyncronously.
+ *
+ * @param task Callable.
+ * @param threadName Thread name.
+ * @return Future with task result.
+ */
+ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+ public static <T> IgniteInternalFuture<T> runAsync(final Callable<T> task, String threadName) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to start new threads (test is being stopped).");
try {
- final GridTestSafeThreadFactory thrFactory = new GridTestSafeThreadFactory("async-runner");
+ final GridTestSafeThreadFactory thrFactory = new GridTestSafeThreadFactory(threadName);
final GridFutureAdapter<T> fut = new GridFutureAdapter<T>() {
@Override public boolean cancel() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3676cbe7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
new file mode 100644
index 0000000..25c3b81
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheIndexStreamerTest extends GridCommonAbstractTest {
+ /** */
+ private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStreamer() throws Exception {
+ final Ignite ignite = startGrid(0);
+
+ final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration());
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final int KEYS= 10_000;
+
+ try {
+ IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(null)) {
+ for (int i = 0; i < 1; i++)
+ streamer.addData(rnd.nextInt(KEYS), String.valueOf(i));
+ }
+ }
+
+ return null;
+ }
+ }, "streamer-thread");
+
+ IgniteInternalFuture updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ for (int i = 0; i < 100; i++) {
+ Integer key = rnd.nextInt(KEYS);
+
+ cache.put(key, String.valueOf(key));
+
+ cache.remove(key);
+ }
+ }
+
+ return null;
+ }
+ }, 1, "update-thread");
+
+ U.sleep(30_000);
+
+ stop.set(true);
+
+ streamerFut.get();
+ updateFut.get();
+ }
+ finally {
+ stop.set(true);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+ ccfg.setOffHeapMaxMemory(0);
+ ccfg.setBackups(1);
+ ccfg.setIndexedTypes(Integer.class, String.class);
+
+ return ccfg;
+ }
+
+
+}