You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 15:43:30 UTC
[22/50] [abbrv] incubator-ignite git commit: # ignite-929 close does
not destroy cache (cherry picked from commit e3fba88)
# ignite-929 close does not destroy cache (cherry picked from commit e3fba88)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a233fa00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a233fa00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a233fa00
Branch: refs/heads/ignite-1124
Commit: a233fa00fcfb1266acdecfec45b7ac6024cc9791
Parents: 3dcf891
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 10 10:20:11 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 10 10:48:44 2015 +0300
----------------------------------------------------------------------
.../examples/ScalarCacheAffinityExample.scala | 2 +-
.../scalar/examples/ScalarCacheExample.scala | 2 +-
.../ScalarCachePopularNumbersExample.scala | 2 +-
.../examples/ScalarCacheQueryExample.scala | 2 +-
.../examples/ScalarSnowflakeSchemaExample.scala | 4 +-
.../java/org/apache/ignite/IgniteCache.java | 14 +-
.../org/apache/ignite/cache/CacheManager.java | 13 +-
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../discovery/GridDiscoveryManager.java | 23 +-
.../cache/DynamicCacheChangeRequest.java | 39 +-
.../processors/cache/GridCacheGateway.java | 4 +-
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCacheProcessor.java | 102 ++-
.../processors/cache/IgniteCacheProxy.java | 448 +++++++---
.../distributed/dht/GridDhtCacheEntry.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 30 +-
.../visor/cache/VisorCacheStopTask.java | 2 +-
.../affinity/IgniteClientNodeAffinityTest.java | 14 +-
.../IgniteFairAffinityDynamicCacheSelfTest.java | 3 +-
...cheStoreSessionListenerAbstractSelfTest.java | 111 ++-
.../GridCacheTxLoadFromStoreOnLockSelfTest.java | 34 +-
.../CacheMetricsForClusterGroupSelfTest.java | 10 +-
.../cache/CacheOffheapMapEntrySelfTest.java | 7 +-
.../cache/CacheStopAndDestroySelfTest.java | 859 +++++++++++++++++++
...eUsageMultinodeDynamicStartAbstractTest.java | 2 +-
...ProjectionForCachesOnDaemonNodeSelfTest.java | 2 +-
.../cache/IgniteDynamicCacheStartSelfTest.java | 140 +--
...teCacheClientNodePartitionsExchangeTest.java | 29 +-
...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 2 +-
.../DataStreamerMultinodeCreateCacheTest.java | 14 +-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
.../CacheConfigurationP2PTestClient.java | 4 +-
32 files changed, 1593 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
index fbf66bc..40b947d 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
@@ -62,7 +62,7 @@ object ScalarCacheAffinityExample extends App {
visitUsingMapKeysToNodes(cache)
}
finally {
- cache.close()
+ cache.destroy()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
index 42e8ca4..0bf8d6f 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
@@ -50,7 +50,7 @@ object ScalarCacheExample extends App {
basicOperations()
}
finally {
- cache.close()
+ cache.destroy()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
index 828c5a3..d113297 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
@@ -93,7 +93,7 @@ object ScalarCachePopularNumbersExample extends App {
}
}
finally {
- cache.close()
+ cache.destroy()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
index b8054eb..1a42947 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
@@ -55,7 +55,7 @@ object ScalarCacheQueryExample {
example(ignite$)
}
finally {
- cache.close()
+ cache.destroy()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
index 2656f44..33b2fcc 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
@@ -86,11 +86,11 @@ object ScalarSnowflakeSchemaExample {
queryProductPurchases()
}
finally {
- factCache.close()
+ factCache.destroy()
}
}
finally {
- dimCache.close()
+ dimCache.destroy()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index c8d6d7a..4938ab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -543,9 +543,21 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
CacheEntryProcessor<K, V, T> entryProcessor, Object... args);
/**
+ * Closes this cache instance.
+ * <p>
+ * For local cache equivalent to {@link #destroy()}.
+ * For distributed caches, if called on clients, stops client cache, if called on a server node,
+ * just closes this cache instance and does not destroy cache data.
+ * <p>
+ * After cache instance is closed another {@link IgniteCache} instance for the same
+ * cache can be created using {@link Ignite#cache(String)} method.
+ */
+ @Override public void close();
+
+ /**
* Completely deletes the cache with all its data from the system on all cluster nodes.
*/
- @Override void close();
+ public void destroy();
/**
* This cache node to re-balance its partitions. This method is usually used when
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java
index 9ba50d1..bc6df76 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java
@@ -130,6 +130,7 @@ public class CacheManager implements javax.cache.CacheManager {
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public <K, V, C extends Configuration<K, V>> Cache<K, V> createCache(String cacheName, C cacheCfg)
throws IllegalArgumentException {
kernalGateway.readLock();
@@ -155,11 +156,11 @@ public class CacheManager implements javax.cache.CacheManager {
IgniteCache<K, V> res = ignite.createCache(igniteCacheCfg);
- ((IgniteCacheProxy<K, V>)res).setCacheManager(this);
-
if (res == null)
throw new CacheException();
+ ((IgniteCacheProxy<K, V>)res).setCacheManager(this);
+
if (igniteCacheCfg.isManagementEnabled())
enableManagement(cacheName, true);
@@ -219,6 +220,7 @@ public class CacheManager implements javax.cache.CacheManager {
/**
* @param cacheName Cache name.
+ * @return Cache.
*/
@Nullable private <K, V> IgniteCache<K, V> getCache0(String cacheName) {
if (cacheName == null)
@@ -272,11 +274,13 @@ public class CacheManager implements javax.cache.CacheManager {
}
if (cache != null)
- cache.close();
+ cache.destroy();
}
/**
* @param cacheName Cache name.
+ * @param objName Object name.
+ * @return Object name instance.
*/
private ObjectName getObjectName(String cacheName, String objName) {
String mBeanName = "javax.cache:type=" + objName + ",CacheManager="
@@ -339,7 +343,8 @@ public class CacheManager implements javax.cache.CacheManager {
/**
* @param mxbean MXBean.
- * @param name cache name.
+ * @param name Cache name.
+ * @param beanType Bean type.
*/
private void registerCacheObject(Object mxbean, String name, String beanType) {
MBeanServer mBeanSrv = ignite.configuration().getMBeanServer();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d6ddf79..024dc7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2436,7 +2436,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
IgniteInternalFuture<?> stopFut;
try {
- stopFut = ctx.cache().dynamicStopCache(cacheName);
+ stopFut = ctx.cache().dynamicDestroyCache(cacheName);
}
finally {
unguard();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a8ce8ff..eae07ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -263,6 +263,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Removes near node ID from cache filter.
+ *
+ * @param cacheName Cache name.
+ * @param clientNodeId Near node ID.
+ */
+ public void onClientCacheClose(String cacheName, UUID clientNodeId) {
+ CachePredicate predicate = registeredCaches.get(cacheName);
+
+ if (predicate != null)
+ predicate.onNodeLeft(clientNodeId);
+ }
+
+ /**
* @return Client nodes map.
*/
public Map<String, Map<UUID, Boolean>> clientNodesMap() {
@@ -1079,9 +1092,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if node for given ID is alive.
*/
public boolean alive(UUID nodeId) {
+ return getAlive(nodeId) != null;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return Node if node is alive.
+ */
+ @Nullable public ClusterNode getAlive(UUID nodeId) {
assert nodeId != null;
- return getSpi().getNode(nodeId) != null; // Go directly to SPI without checking disco cache.
+ return getSpi().getNode(nodeId); // Go directly to SPI without checking disco cache.
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index c08a179..7af1572 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -57,6 +57,9 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Stop flag. */
private boolean stop;
+ /** Close flag. */
+ private boolean close;
+
/** Fail if exists flag. */
private boolean failIfExists;
@@ -68,23 +71,10 @@ public class DynamicCacheChangeRequest implements Serializable {
*
* @param cacheName Cache stop name.
* @param initiatingNodeId Initiating node ID.
- * @param stop Stop flag.
*/
- public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId, boolean stop) {
+ public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) {
this.cacheName = cacheName;
this.initiatingNodeId = initiatingNodeId;
-
- this.stop = stop;
- }
-
- /**
- * Constructor means for start requests.
- *
- * @param cacheName Cache name.
- * @param initiatingNodeId Initiating node ID.
- */
- public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) {
- this(cacheName, initiatingNodeId, false);
}
/**
@@ -130,6 +120,13 @@ public class DynamicCacheChangeRequest implements Serializable {
}
/**
+ * @param stop New stop flag.
+ */
+ public void stop(boolean stop) {
+ this.stop = stop;
+ }
+
+ /**
* @return Cache name.
*/
public String cacheName() {
@@ -220,6 +217,20 @@ public class DynamicCacheChangeRequest implements Serializable {
this.failIfExists = failIfExists;
}
+ /**
+ * @return Close flag.
+ */
+ public boolean close() {
+ return close;
+ }
+
+ /**
+ * @param close New close flag.
+ */
+ public void close(boolean close) {
+ this.close = close;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index d9d151c..f2beb0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -68,7 +68,7 @@ public class GridCacheGateway<K, V> {
*
* @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
*/
- public boolean enterIfNotClosed() {
+ public boolean enterIfNotStopped() {
onEnter();
// Must unlock in case of unexpected errors to avoid
@@ -89,7 +89,7 @@ public class GridCacheGateway<K, V> {
*
* @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
*/
- public boolean enterIfNotClosedNoLock() {
+ public boolean enterIfNotStoppedNoLock() {
onEnter();
return !stopped;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index af87685..4398b4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -156,16 +156,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// Validate requests to check if event should trigger partition exchange.
for (DynamicCacheChangeRequest req : batch.requests()) {
- if (cctx.cache().dynamicCacheRegistered(req))
+ if (cctx.cache().exchangeNeeded(req))
valid.add(req);
else
cctx.cache().completeStartFuture(req);
}
if (!F.isEmpty(valid)) {
- exchId = exchangeId(n.id(),
- affinityTopologyVersion(e),
- e.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
exchFut = exchangeFuture(exchId, e, valid);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index de1eac2..bb87a86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1390,10 +1390,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return {@code True} if change request was registered to apply.
*/
@SuppressWarnings("IfMayBeConditional")
- public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) {
+ public boolean exchangeNeeded(DynamicCacheChangeRequest req) {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
if (desc != null) {
+ if (req.close()) {
+ assert req.initiatingNodeId() != null : req;
+
+ return true;
+ }
+
if (desc.deploymentId().equals(req.deploymentId())) {
if (req.start())
return !desc.cancelled();
@@ -1515,20 +1521,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Stop request.
*/
public void blockGateway(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() || req.close();
- // Break the proxy before exchange future is done.
- IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
+ if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
+ // Break the proxy before exchange future is done.
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
- if (proxy != null)
- proxy.gate().block();
+ if (proxy != null) {
+ if (req.stop())
+ proxy.gate().block();
+ else
+ proxy.closeProxy();
+ }
+ }
}
/**
* @param req Request.
*/
private void stopGateway(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() : req;
// Break the proxy before exchange future is done.
IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName()));
@@ -1541,7 +1553,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Stop request.
*/
public void prepareCacheStop(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() || req.close() : req;
GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName()));
@@ -1597,6 +1609,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
registeredCaches.remove(masked, desc);
}
+ else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
+
+ if (proxy != null) {
+ if (proxy.context().affinityNode()) {
+ GridCacheAdapter<?, ?> cache = caches.get(masked);
+
+ if (cache != null)
+ jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
+ }
+ else {
+ proxy.context().gate().onStopped();
+
+ prepareCacheStop(req);
+ }
+ }
+ }
completeStartFuture(req);
}
@@ -2005,13 +2034,35 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param cacheName Cache name to stop.
- * @return Future that will be completed when cache is stopped.
+ * @param cacheName Cache name to destroy.
+ * @return Future that will be completed when cache is destroyed.
*/
- public IgniteInternalFuture<?> dynamicStopCache(String cacheName) {
+ public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) {
checkEmptyTransactions();
- DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true);
+ DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
+
+ t.stop(true);
+
+ return F.first(initiateCacheChanges(F.asList(t), false));
+ }
+
+
+ /**
+ * @param cacheName Cache name to close.
+ * @return Future that will be completed when cache is closed.
+ */
+ public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName));
+
+ if (proxy == null || proxy.proxyClosed())
+ return new GridFinishedFuture<>(); // No-op.
+
+ checkEmptyTransactions();
+
+ DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
+
+ t.close(true);
return F.first(initiateCacheChanges(F.asList(t), false));
}
@@ -2031,16 +2082,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req);
try {
- if (req.stop()) {
+ if (req.stop() || req.close()) {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
if (desc == null)
// No-op.
fut.onDone();
else {
+ assert desc.cacheConfiguration() != null : desc;
+
+ if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) {
+ req.close(false);
+
+ req.stop(true);
+ }
+
IgniteUuid dynamicDeploymentId = desc.deploymentId();
- assert dynamicDeploymentId != null;
+ assert dynamicDeploymentId != null : desc;
// Save deployment ID to avoid concurrent stops.
req.deploymentId(dynamicDeploymentId);
@@ -2188,9 +2247,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.nearCacheConfiguration() != null);
}
else {
+ assert req.stop() || req.close() : req;
+
if (desc == null) {
- // If local node initiated start, fail the start future.
- DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
+ // If local node initiated start, finish future.
+ DynamicCacheStartFuture changeFut =
+ (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) {
// No-op.
@@ -2200,9 +2262,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return;
}
- desc.onCancelled();
+ if (req.stop()) {
+ desc.onCancelled();
- ctx.discovery().removeCacheFilter(req.cacheName());
+ ctx.discovery().removeCacheFilter(req.cacheName());
+ }
+ else
+ ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b31b2e8..9767f49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -18,8 +18,8 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
-import org.apache.ignite.cache.CacheManager;
import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.CacheManager;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
@@ -171,19 +171,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public CacheMetrics metrics() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().metrics();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public CacheMetrics metrics(ClusterGroup grp) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
@@ -202,19 +206,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public CacheMetricsMXBean mxBean() {
- CacheOperationContext prev = gate.enter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().mxBean();
}
finally {
- gate.leave(prev);
+ onLeave(gate, prev);
}
}
@@ -230,19 +236,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Nullable @Override public Cache.Entry<K, V> randomEntry() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().randomEntry();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) :
@@ -251,7 +261,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -262,7 +272,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withNoRetries() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
boolean noRetries = opCtx != null && opCtx.noRetries();
@@ -280,14 +292,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -296,7 +310,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
ctx.cache().globalLoadCache(p, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -307,7 +321,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -316,7 +332,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.localLoadCache(p, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -327,7 +343,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -339,7 +357,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndPutIfAbsent(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -359,13 +377,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean isLocalLocked(K key, boolean byCurrThread) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -379,7 +399,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
final CacheQuery<Map.Entry<K,V>> qry;
final CacheQueryFuture<Map.Entry<K,V>> fut;
- boolean isKeepPortable = opCtx != null ? opCtx.isKeepPortable() : false;
+ boolean isKeepPortable = opCtx != null && opCtx.isKeepPortable();
if (filter instanceof ScanQuery) {
IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
@@ -444,11 +464,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
- * @param local Enforce local.
+ * @param loc Enforce local.
* @return Local node cluster group.
*/
- private ClusterGroup projection(boolean local) {
- if (local || ctx.isLocal() || isReplicatedDataNode())
+ private ClusterGroup projection(boolean loc) {
+ if (loc || ctx.isLocal() || isReplicatedDataNode())
return ctx.kernalContext().grid().cluster().forLocal();
if (ctx.isReplicated())
@@ -517,7 +537,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <R> QueryCursor<R> query(Query<R> qry) {
A.notNull(qry, "qry");
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -558,7 +580,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw new CacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -589,7 +611,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localEntries(peekModes);
@@ -598,37 +622,43 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public QueryMetrics queryMetrics() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.context().queries().metrics();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localEvict(Collection<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.evictAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localPeek(key, peekModes, null);
@@ -637,20 +667,22 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localPromote(Set<? extends K> keys) throws CacheException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.promoteAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -660,7 +692,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public int size(CachePeekMode... peekModes) throws CacheException {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -675,13 +709,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public int localSize(CachePeekMode... peekModes) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localSize(peekModes);
@@ -690,14 +726,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public V get(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -709,7 +747,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.get(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -720,7 +758,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -732,7 +772,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -743,7 +783,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -755,7 +797,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAllOutTx(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -769,7 +811,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
*/
public Map<K, V> getAll(Collection<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -781,7 +825,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -796,19 +840,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Entry set.
*/
public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.entrySetx(filter);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public boolean containsKey(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -820,13 +868,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.containsKey(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public boolean containsKeys(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -838,7 +888,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.containsKeys(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -848,7 +898,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
boolean replaceExisting,
@Nullable final CompletionListener completionLsnr
) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting);
@@ -869,14 +921,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void put(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -896,7 +950,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.put(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -907,7 +961,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -919,7 +975,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndPut(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -930,7 +986,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> map) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -939,7 +997,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.putAll(map);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -950,7 +1008,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -962,7 +1022,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.putIfAbsent(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -973,7 +1033,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean remove(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -985,7 +1047,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.remove(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -996,7 +1058,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean remove(K key, V oldVal) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1008,7 +1072,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.remove(key, oldVal);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1019,7 +1083,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndRemove(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1031,7 +1097,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndRemove(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1042,7 +1108,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1054,7 +1122,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.replace(key, oldVal, newVal);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1065,7 +1133,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1077,7 +1147,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.replace(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1088,7 +1158,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1100,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndReplace(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1111,7 +1183,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void removeAll(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1120,7 +1194,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.removeAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1130,7 +1204,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void removeAll() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1142,13 +1218,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clear(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1160,13 +1238,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clearAll(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1178,13 +1258,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clear() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1196,32 +1278,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localClear(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.clearLocally(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localClearAll(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
for (K key : keys)
delegate.clearLocally(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1229,7 +1315,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
throws EntryProcessorException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1255,7 +1343,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1267,7 +1355,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args)
throws EntryProcessorException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1293,7 +1383,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1303,10 +1393,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
- EntryProcessor<K, V, T> entryProcessor,
- Object... args) {
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1318,7 +1410,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(keys, entryProcessor, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1331,7 +1423,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
CacheEntryProcessor<K, V, T> entryProcessor,
Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1343,7 +1437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(keys, entryProcessor, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1356,7 +1450,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1368,7 +1464,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(map, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1394,17 +1490,43 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public void destroy() {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
+ return;
+
+ IgniteInternalFuture<?> fut;
+
+ try {
+ fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name());
+ }
+ finally {
+ onLeave(gate);
+ }
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void close() {
- if (!onEnterIfNoClose())
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
return;
IgniteInternalFuture<?> fut;
try {
- fut = ctx.kernalContext().cache().dynamicStopCache(ctx.name());
+ fut = ctx.kernalContext().cache().dynamicCloseCache(ctx.name());
}
finally {
- onLeave();
+ onLeave(gate);
}
try {
@@ -1417,14 +1539,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean isClosed() {
- if (!onEnterIfNoClose())
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
return true;
try {
return ctx.kernalContext().cache().context().closed(ctx);
}
finally {
- onLeave();
+ onLeave(gate);
}
}
@@ -1448,7 +1572,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
@@ -1457,13 +1583,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
@@ -1472,19 +1600,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<K, V>> iterator() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().igniteIterator();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1516,8 +1646,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
*
* @return Projection for portable objects.
*/
+ @SuppressWarnings("unchecked")
public <K1, V1> IgniteCache<K1, V1> keepPortable() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
CacheOperationContext opCtx0 =
@@ -1535,7 +1668,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1543,7 +1676,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Cache with skip store enabled.
*/
public IgniteCache<K, V> skipStore() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
boolean skip = opCtx != null && opCtx.skipStore();
@@ -1565,7 +1700,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1592,10 +1727,69 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
+ * @return {@code True} if proxy was closed.
+ */
+ public boolean proxyClosed() {
+ return !gate.getClass().equals(GridCacheGateway.class);
+ }
+
+ /**
+ * Closes this proxy instance.
+ */
+ public void closeProxy() {
+ gate = new GridCacheGateway<K, V>(ctx) {
+ @Override public void enter() {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Override public boolean enterIfNotStopped() {
+ return false;
+ }
+
+ @Override public boolean enterIfNotStoppedNoLock() {
+ return false;
+ }
+
+ @Override public void leaveNoLock() {
+ assert false;
+ }
+
+ @Override public void leave() {
+ assert false;
+ }
+
+ @Nullable @Override public CacheOperationContext enter(@Nullable CacheOperationContext opCtx) {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Nullable @Override public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Override public void leave(CacheOperationContext prev) {
+ assert false;
+ }
+
+ @Override public void leaveNoLock(CacheOperationContext prev) {
+ assert false;
+ }
+
+ @Override public void block() {
+ // No-op.
+ }
+
+ @Override public void onStopped() {
+ // No-op.
+ }
+ };
+ }
+
+ /**
+ * @param gate Cache gateway.
* @param opCtx Cache operation context to guard.
* @return Previous projection set on this thread.
*/
- private CacheOperationContext onEnter(CacheOperationContext opCtx) {
+ private CacheOperationContext onEnter(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) {
if (lock)
return gate.enter(opCtx);
else
@@ -1603,21 +1797,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
- * On enter.
- *
+ * @param gate Cache gateway.
* @return {@code True} if enter successful.
*/
- private boolean onEnterIfNoClose() {
+ private boolean onEnterIfNoStop(GridCacheGateway<K, V> gate) {
if (lock)
- return gate.enterIfNotClosed();
+ return gate.enterIfNotStopped();
else
- return gate.enterIfNotClosedNoLock();
+ return gate.enterIfNotStoppedNoLock();
}
/**
+ * @param gate Cache gateway.
* @param opCtx Operation context to guard.
*/
- private void onLeave(CacheOperationContext opCtx) {
+ private void onLeave(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) {
if (lock)
gate.leave(opCtx);
else
@@ -1625,9 +1819,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
- * On leave.
+ * @param gate Cache gateway.
*/
- private void onLeave() {
+ private void onLeave(GridCacheGateway<K, V> gate) {
if (lock)
gate.leave();
else
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 89b85c4..3b411b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -597,7 +597,9 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
List<ReaderId> newRdrs = null;
for (int i = 0; i < rdrs.length; i++) {
- if (!cctx.discovery().alive(rdrs[i].nodeId())) {
+ ClusterNode node = cctx.discovery().getAlive(rdrs[i].nodeId());
+
+ if (node == null || !cctx.discovery().cacheNode(node, cacheName())) {
// Node has left and if new list has already been created, just skip.
// Otherwise, create new list and add alive nodes.
if (newRdrs == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 38a0d55..5701749 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -474,6 +474,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
oldestNode.set(oldest);
+ if (!F.isEmpty(reqs))
+ blockGateways();
+
startCaches();
// True if client node joined or failed.
@@ -489,24 +492,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
else {
assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
- boolean clientOnlyStart = true;
+ boolean clientOnlyCacheEvt = true;
for (DynamicCacheChangeRequest req : reqs) {
- if (!req.clientStartOnly()) {
- clientOnlyStart = false;
+ if (req.clientStartOnly() || req.close())
+ continue;
- break;
- }
+ clientOnlyCacheEvt = false;
+
+ break;
}
- clientNodeEvt = clientOnlyStart;
+ clientNodeEvt = clientOnlyCacheEvt;
}
if (clientNodeEvt) {
ClusterNode node = discoEvt.eventNode();
// Client need to initialize affinity for local join event or for stated client caches.
- if (!node.isLocal()) {
+ if (!node.isLocal() || clientCacheClose()) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -733,9 +737,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (log.isDebugEnabled())
log.debug("After waiting for partition release future: " + this);
- if (!F.isEmpty(reqs))
- blockGateways();
-
if (exchId.isLeft())
cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
@@ -839,6 +840,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @return {@code True} if exchange initiated for client cache close.
+ */
+ private boolean clientCacheClose() {
+ return reqs != null && reqs.size() == 1 && reqs.iterator().next().close();
+ }
+
+ /**
*
*/
private void dumpPendingObjects() {
@@ -903,7 +911,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*/
private void blockGateways() {
for (DynamicCacheChangeRequest req : reqs) {
- if (req.stop())
+ if (req.stop() || req.close())
cctx.cache().blockGateway(req);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java
index 0e848f9..83d19f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java
@@ -56,7 +56,7 @@ public class VisorCacheStopTask extends VisorOneNodeTask<String, Void> {
@Override protected Void run(String cacheName) {
IgniteCache cache = ignite.cache(cacheName);
- cache.close();
+ cache.destroy();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
index 467349f..da27fb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
@@ -127,13 +127,23 @@ public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest {
ccfg.setNodeFilter(new TestNodesFilter());
- try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg)) {
+ IgniteCache<Integer, Integer> cache = client.createCache(ccfg);
+
+ try {
checkCache(null, 1);
}
+ finally {
+ cache.destroy();
+ }
- try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg, new NearCacheConfiguration())) {
+ cache = client.createCache(ccfg, new NearCacheConfiguration());
+
+ try {
checkCache(null, 1);
}
+ finally {
+ cache.destroy();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
index 18b77e0..e51be58 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
@@ -84,8 +84,7 @@ public class IgniteFairAffinityDynamicCacheSelfTest extends GridCommonAbstractTe
cache.put(i, i);
IgniteInternalFuture<Object> destFut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
+ @Override public Object call() throws Exception {
ignite(0).destroyCache(cache.getName());
return null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
index 0634197..8e53f05 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
@@ -113,12 +113,17 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
public void testAtomicCache() throws Exception {
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.ATOMIC);
- try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
+ IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
+
+ try {
cache.loadCache(null);
cache.get(1);
cache.put(1, 1);
cache.remove(1);
}
+ finally {
+ cache.destroy();
+ }
assertEquals(3, loadCacheCnt.get());
assertEquals(1, loadCnt.get());
@@ -133,12 +138,17 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
public void testTransactionalCache() throws Exception {
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL);
- try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
+ IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
+
+ try {
cache.loadCache(null);
cache.get(1);
cache.put(1, 1);
cache.remove(1);
}
+ finally {
+ cache.destroy();
+ }
assertEquals(3, loadCacheCnt.get());
assertEquals(1, loadCnt.get());
@@ -153,15 +163,18 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
public void testExplicitTransaction() throws Exception {
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL);
- try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
- try (Transaction tx = ignite(0).transactions().txStart()) {
- cache.put(1, 1);
- cache.put(2, 2);
- cache.remove(3);
- cache.remove(4);
+ IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
- tx.commit();
- }
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache.put(1, 1);
+ cache.put(2, 2);
+ cache.remove(3);
+ cache.remove(4);
+
+ tx.commit();
+ }
+ finally {
+ cache.destroy();
}
assertEquals(2, writeCnt.get());
@@ -176,18 +189,20 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
- try (
- IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
- IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
- ) {
- try (Transaction tx = ignite(0).transactions().txStart()) {
- cache1.put(1, 1);
- cache2.put(2, 2);
- cache1.remove(3);
- cache2.remove(4);
-
- tx.commit();
- }
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2);
+
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+ cache1.remove(3);
+ cache2.remove(4);
+
+ tx.commit();
+ }
+ finally {
+ cache1.destroy();
+ cache2.destroy();
}
assertEquals(2, writeCnt.get());
@@ -204,16 +219,18 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
- try (
- IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
- IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
- ) {
- try (Transaction tx = ignite(0).transactions().txStart()) {
- cache1.put(1, 1);
- cache2.put(2, 2);
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2);
- tx.commit();
- }
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+
+ tx.commit();
+ }
+ finally {
+ cache1.destroy();
+ cache2.destroy();
}
try (Connection conn = DriverManager.getConnection(URL)) {
@@ -232,25 +249,27 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
- try (
- IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
- IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
- ) {
- try (Transaction tx = ignite(0).transactions().txStart()) {
- cache1.put(1, 1);
- cache2.put(2, 2);
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2);
- tx.commit();
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
- assert false : "Exception was not thrown.";
- }
- catch (IgniteException e) {
- CacheWriterException we = X.cause(e, CacheWriterException.class);
+ tx.commit();
- assertNotNull(we);
+ assert false : "Exception was not thrown.";
+ }
+ catch (IgniteException e) {
+ CacheWriterException we = X.cause(e, CacheWriterException.class);
+
+ assertNotNull(we);
- assertEquals("Expected failure.", we.getMessage());
- }
+ assertEquals("Expected failure.", we.getMessage());
+ }
+ finally {
+ cache1.destroy();
+ cache2.destroy();
}
try (Connection conn = DriverManager.getConnection(URL)) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a233fa00/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
index 7b01f0f..bc6b443 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
@@ -92,31 +92,33 @@ public class GridCacheTxLoadFromStoreOnLockSelfTest extends GridCommonAbstractTe
cacheCfg.setBackups(backups);
cacheCfg.setLoadPreviousValue(true);
- try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg)) {
- for (int i = 0; i < 10; i++)
- assertEquals((Integer)i, cache.get(i));
+ IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg);
- cache.removeAll();
+ for (int i = 0; i < 10; i++)
+ assertEquals((Integer)i, cache.get(i));
- assertEquals(0, cache.size());
+ cache.removeAll();
- for (TransactionConcurrency conc : TransactionConcurrency.values()) {
- for (TransactionIsolation iso : TransactionIsolation.values()) {
- info("Checking transaction [conc=" + conc + ", iso=" + iso + ']');
+ assertEquals(0, cache.size());
- try (Transaction tx = ignite(0).transactions().txStart(conc, iso)) {
- for (int i = 0; i < 10; i++)
- assertEquals("Invalid value for transaction [conc=" + conc + ", iso=" + iso + ']',
- (Integer)i, cache.get(i));
+ for (TransactionConcurrency conc : TransactionConcurrency.values()) {
+ for (TransactionIsolation iso : TransactionIsolation.values()) {
+ info("Checking transaction [conc=" + conc + ", iso=" + iso + ']');
- tx.commit();
- }
+ try (Transaction tx = ignite(0).transactions().txStart(conc, iso)) {
+ for (int i = 0; i < 10; i++)
+ assertEquals("Invalid value for transaction [conc=" + conc + ", iso=" + iso + ']',
+ (Integer)i, cache.get(i));
- cache.removeAll();
- assertEquals(0, cache.size());
+ tx.commit();
}
+
+ cache.removeAll();
+ assertEquals(0, cache.size());
}
}
+
+ cache.destroy();
}
/**