You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/09 12:22:54 UTC
incubator-ignite git commit: # ignite-929
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-929 b6c7eaee3 -> adad6cc4f
# ignite-929
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/adad6cc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/adad6cc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/adad6cc4
Branch: refs/heads/ignite-929
Commit: adad6cc4f83fdb314a24f057eb64ceb6418da09f
Parents: b6c7eae
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 9 12:42:24 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 9 13:20:46 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheGateway.java | 23 --
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCacheProcessor.java | 98 +++--
.../processors/cache/IgniteCacheProxy.java | 406 +++++++++++++------
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../cache/CacheStopAndDestroySelfTest.java | 234 +++++++----
6 files changed, 496 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index df450d0..f2beb0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -35,9 +35,6 @@ public class GridCacheGateway<K, V> {
/** Stopped flag for dynamic caches. */
private volatile boolean stopped;
- /** Closed flag for dynamic caches. */
- private final ThreadLocal<Boolean> closed = new ThreadLocal<>();
-
/** */
private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
@@ -153,12 +150,6 @@ public class GridCacheGateway<K, V> {
throw new IllegalStateException("Cache has been stopped: " + ctx.name());
}
- if (closed.get() != null) {
- rwLock.readUnlock();
-
- throw new IllegalStateException("Cache has been closed: " + ctx.name());
- }
-
// Must unlock in case of unexpected errors to avoid
// deadlocks during kernal stop.
try {
@@ -245,20 +236,6 @@ public class GridCacheGateway<K, V> {
/**
*
*/
- public void open() {
- closed.remove();
- }
-
- /**
- *
- */
- public void close() {
- closed.set(Boolean.TRUE);
- }
-
- /**
- *
- */
public void onStopped() {
boolean interrupted = false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index af87685..4398b4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -156,16 +156,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// Validate requests to check if event should trigger partition exchange.
for (DynamicCacheChangeRequest req : batch.requests()) {
- if (cctx.cache().dynamicCacheRegistered(req))
+ if (cctx.cache().exchangeNeeded(req))
valid.add(req);
else
cctx.cache().completeStartFuture(req);
}
if (!F.isEmpty(valid)) {
- exchId = exchangeId(n.id(),
- affinityTopologyVersion(e),
- e.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
exchFut = exchangeFuture(exchId, e, valid);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e494cd4..f564cb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1390,7 +1390,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return {@code True} if change request was registered to apply.
*/
@SuppressWarnings("IfMayBeConditional")
- public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) {
+ public boolean exchangeNeeded(DynamicCacheChangeRequest req) {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
if (desc != null) {
@@ -1519,20 +1519,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Stop request.
*/
public void blockGateway(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() || req.close();
- // Break the proxy before exchange future is done.
- IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
+ if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
+ // Break the proxy before exchange future is done.
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
- if (proxy != null)
- proxy.gate().block();
+ if (proxy != null) {
+ if (req.stop())
+ proxy.gate().block();
+ else
+ proxy.closeProxy();
+ }
+ }
}
/**
* @param req Request.
*/
private void stopGateway(DynamicCacheChangeRequest req) {
- assert req.stop() || req.close() : req;
+ assert req.stop() : req;
// Break the proxy before exchange future is done.
IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName()));
@@ -1601,6 +1607,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
registeredCaches.remove(masked, desc);
}
+ else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
+
+ if (proxy != null) {
+ if (proxy.context().affinityNode()) {
+ GridCacheAdapter<?, ?> cache = caches.get(masked);
+
+ if (cache != null)
+ jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
+ }
+ else
+ prepareCacheStop(req);
+ }
+ }
completeStartFuture(req);
}
@@ -2034,29 +2054,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName));
- // Closing gateway first.
- if (proxy != null)
- proxy.gate().close();
-
- CacheConfiguration cfg = ctx.cache().cacheConfiguration(cacheName);
-
- if (cfg.getCacheMode() == LOCAL)
- return dynamicDestroyCache(cacheName);
- else {
- GridCacheAdapter<?, ?> cache = caches.get(maskNull(cacheName));
-
- if (cache != null && !cache.context().affinityNode()) {
- GridCacheContext<?, ?> ctx = cache.context();
+ if (proxy == null || proxy.proxyClosed())
+ return new GridFinishedFuture<>(); // No-op.
- DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
+ checkEmptyTransactions();
- t.close(true);
+ DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
- return F.first(initiateCacheChanges(F.asList(t), false));
- }
+ t.close(true);
- return new GridFinishedFuture<>(); // No-op.
- }
+ return F.first(initiateCacheChanges(F.asList(t), false));
}
/**
@@ -2081,9 +2088,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// No-op.
fut.onDone();
else {
+ assert desc.cacheConfiguration() != null : desc;
+
+ if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) {
+ req.close(false);
+
+ req.stop(true);
+ }
+
IgniteUuid dynamicDeploymentId = desc.deploymentId();
- assert dynamicDeploymentId != null;
+ assert dynamicDeploymentId != null : desc;
// Save deployment ID to avoid concurrent stops.
req.deploymentId(dynamicDeploymentId);
@@ -2230,21 +2245,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.initiatingNodeId(),
req.nearCacheConfiguration() != null);
}
- else if (req.close()) {
- if (req.initiatingNodeId().equals(ctx.localNodeId())) {
- stopGateway(req);
-
- prepareCacheStop(req);
- }
-
- ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
-
- completeStartFuture(req);
- }
else {
+ assert req.stop() || req.close() : req;
+
if (desc == null) {
- // If local node initiated start, fail the start future.
- DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
+ // If local node initiated start, finish future.
+ DynamicCacheStartFuture changeFut =
+ (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) {
// No-op.
@@ -2254,9 +2261,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return;
}
- desc.onCancelled();
+ if (req.stop()) {
+ desc.onCancelled();
- ctx.discovery().removeCacheFilter(req.cacheName());
+ ctx.discovery().removeCacheFilter(req.cacheName());
+ }
+ else
+ ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
}
}
}
@@ -2728,9 +2739,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cache == null)
cache = startJCache(cacheName, failIfNotStarted);
- if (cache != null)
- cache.gate().open();
-
return (IgniteCacheProxy<K, V>)cache;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 3a5e27f..080502c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -171,19 +171,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public CacheMetrics metrics() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().metrics();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public CacheMetrics metrics(ClusterGroup grp) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
@@ -202,19 +206,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public CacheMetricsMXBean mxBean() {
- CacheOperationContext prev = gate.enter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().mxBean();
}
finally {
- gate.leave(prev);
+ onLeave(gate, prev);
}
}
@@ -230,19 +236,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Nullable @Override public Cache.Entry<K, V> randomEntry() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().randomEntry();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) :
@@ -251,7 +261,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -262,7 +272,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withNoRetries() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
boolean noRetries = opCtx != null && opCtx.noRetries();
@@ -280,14 +292,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -296,7 +310,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
ctx.cache().globalLoadCache(p, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -307,7 +321,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -316,7 +332,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.localLoadCache(p, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -327,7 +343,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -339,7 +357,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndPutIfAbsent(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -359,13 +377,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean isLocalLocked(K key, boolean byCurrThread) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -517,7 +537,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <R> QueryCursor<R> query(Query<R> qry) {
A.notNull(qry, "qry");
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -558,7 +580,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw new CacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -589,7 +611,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localEntries(peekModes);
@@ -598,37 +622,43 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public QueryMetrics queryMetrics() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.context().queries().metrics();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localEvict(Collection<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.evictAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localPeek(key, peekModes, null);
@@ -637,20 +667,22 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localPromote(Set<? extends K> keys) throws CacheException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.promoteAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -660,7 +692,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public int size(CachePeekMode... peekModes) throws CacheException {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -675,13 +709,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public int localSize(CachePeekMode... peekModes) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localSize(peekModes);
@@ -690,14 +726,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public V get(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -709,7 +747,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.get(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -720,7 +758,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -732,7 +772,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -743,7 +783,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -755,7 +797,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAllOutTx(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -769,7 +811,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
*/
public Map<K, V> getAll(Collection<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -781,7 +825,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -796,19 +840,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Entry set.
*/
public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.entrySetx(filter);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public boolean containsKey(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -820,13 +868,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.containsKey(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public boolean containsKeys(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -838,7 +888,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.containsKeys(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -848,7 +898,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
boolean replaceExisting,
@Nullable final CompletionListener completionLsnr
) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting);
@@ -869,14 +921,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void put(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -896,7 +950,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.put(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -907,7 +961,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -919,7 +975,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndPut(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -930,7 +986,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> map) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -939,7 +997,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.putAll(map);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -950,7 +1008,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -962,7 +1022,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.putIfAbsent(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -973,7 +1033,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean remove(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -985,7 +1047,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.remove(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -996,7 +1058,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean remove(K key, V oldVal) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1008,7 +1072,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.remove(key, oldVal);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1019,7 +1083,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndRemove(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1031,7 +1097,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndRemove(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1042,7 +1108,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1054,7 +1122,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.replace(key, oldVal, newVal);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1065,7 +1133,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1077,7 +1147,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.replace(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1088,7 +1158,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1100,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndReplace(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1111,7 +1183,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void removeAll(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1120,7 +1194,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.removeAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1130,7 +1204,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void removeAll() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1142,13 +1218,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clear(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1160,13 +1238,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clearAll(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1178,13 +1258,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clear() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1196,32 +1278,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localClear(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.clearLocally(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localClearAll(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
for (K key : keys)
delegate.clearLocally(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1229,7 +1315,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
throws EntryProcessorException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1255,7 +1343,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1267,7 +1355,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args)
throws EntryProcessorException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1293,7 +1383,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1303,10 +1393,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
- EntryProcessor<K, V, T> entryProcessor,
- Object... args) {
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1318,7 +1410,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(keys, entryProcessor, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1331,7 +1423,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
CacheEntryProcessor<K, V, T> entryProcessor,
Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1343,7 +1437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(keys, entryProcessor, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1356,7 +1450,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1368,7 +1464,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(map, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1395,7 +1491,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void destroy() {
- if (!onEnterIfNoStop())
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
return;
IgniteInternalFuture<?> fut;
@@ -1404,7 +1502,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name());
}
finally {
- onLeave();
+ onLeave(gate);
}
try {
@@ -1417,7 +1515,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void close() {
- if (!onEnterIfNoStop())
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
return;
IgniteInternalFuture<?> fut;
@@ -1426,7 +1526,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
fut = ctx.kernalContext().cache().dynamicCloseCache(ctx.name());
}
finally {
- onLeave();
+ onLeave(gate);
}
try {
@@ -1439,14 +1539,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean isClosed() {
- if (!onEnterIfNoStop())
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
return true;
try {
return ctx.kernalContext().cache().context().closed(ctx);
}
finally {
- onLeave();
+ onLeave(gate);
}
}
@@ -1470,7 +1572,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
@@ -1479,13 +1583,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
@@ -1494,19 +1600,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<K, V>> iterator() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().igniteIterator();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1539,7 +1647,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Projection for portable objects.
*/
public <K1, V1> IgniteCache<K1, V1> keepPortable() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
CacheOperationContext opCtx0 =
@@ -1557,7 +1667,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1565,7 +1675,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Cache with skip store enabled.
*/
public IgniteCache<K, V> skipStore() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
boolean skip = opCtx != null && opCtx.skipStore();
@@ -1587,7 +1699,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1614,10 +1726,68 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
+ * @return {@code True} if proxy was closed.
+ */
+ public boolean proxyClosed() {
+ return !gate.getClass().equals(GridCacheGateway.class);
+ }
+
+ /**
+ * Closes this proxy instance.
+ */
+ public void closeProxy() {
+ gate = new GridCacheGateway<K, V>(ctx) {
+ @Override public void enter() {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Override public boolean enterIfNotStopped() {
+ return false;
+ }
+
+ @Override public boolean enterIfNotStoppedNoLock() {
+ return false;
+ }
+
+ @Override public void leaveNoLock() {
+ assert false;
+ }
+
+ @Override public void leave() {
+ assert false;
+ }
+
+ @Nullable @Override public CacheOperationContext enter(@Nullable CacheOperationContext opCtx) {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Nullable @Override public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Override public void leave(CacheOperationContext prev) {
+ assert false;
+ }
+
+ @Override public void leaveNoLock(CacheOperationContext prev) {
+ assert false;
+ }
+
+ @Override public void block() {
+ // No-op.
+ }
+
+ @Override public void onStopped() {
+ // No-op.
+ }
+ };
+ }
+
+ /**
* @param opCtx Cache operation context to guard.
* @return Previous projection set on this thread.
*/
- private CacheOperationContext onEnter(CacheOperationContext opCtx) {
+ private CacheOperationContext onEnter(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) {
if (lock)
return gate.enter(opCtx);
else
@@ -1629,7 +1799,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
*
* @return {@code True} if enter successful.
*/
- private boolean onEnterIfNoStop() {
+ private boolean onEnterIfNoStop(GridCacheGateway<K, V> gate) {
if (lock)
return gate.enterIfNotStopped();
else
@@ -1639,7 +1809,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/**
* @param opCtx Operation context to guard.
*/
- private void onLeave(CacheOperationContext opCtx) {
+ private void onLeave(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) {
if (lock)
gate.leave(opCtx);
else
@@ -1649,7 +1819,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/**
* On leave.
*/
- private void onLeave() {
+ private void onLeave(GridCacheGateway<K, V> gate) {
if (lock)
gate.leave();
else
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index bae55ca..5701749 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -911,7 +911,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*/
private void blockGateways() {
for (DynamicCacheChangeRequest req : reqs) {
- if (req.stop())
+ if (req.stop() || req.close())
cctx.cache().blockGateway(req);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/adad6cc4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
index 536ddc3..79c2a2a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
@@ -40,6 +40,8 @@ import javax.cache.configuration.*;
import java.util.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
/**
* Checks stop and destroy methods behavior.
*/
@@ -123,7 +125,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
CacheConfiguration cfg = defaultCacheConfiguration();
cfg.setName(CACHE_NAME_DHT);
- cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setCacheMode(PARTITIONED);
cfg.setNearConfiguration(null);
return cfg;
@@ -136,7 +138,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
CacheConfiguration cfg = defaultCacheConfiguration();
cfg.setName(CACHE_NAME_CLIENT);
- cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setCacheMode(PARTITIONED);
cfg.setNearConfiguration(null);
return cfg;
@@ -149,7 +151,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
CacheConfiguration cfg = defaultCacheConfiguration();
cfg.setName(CACHE_NAME_NEAR);
- cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setCacheMode(PARTITIONED);
cfg.setNearConfiguration(new NearCacheConfiguration());
return cfg;
@@ -162,7 +164,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
CacheConfiguration cfg = defaultCacheConfiguration();
cfg.setName(CACHE_NAME_LOC);
- cfg.setCacheMode(CacheMode.LOCAL);
+ cfg.setCacheMode(LOCAL);
cfg.setNearConfiguration(null);
return cfg;
@@ -175,6 +177,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*/
public void testDhtDoubleDestroy() throws Exception {
dhtDestroy();
+
dhtDestroy();
}
@@ -186,29 +189,36 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
private void dhtDestroy() throws Exception {
grid(0).getOrCreateCache(getDhtConfig());
- assert grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL) == null;
+ assertNull(grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL));
grid(0).cache(CACHE_NAME_DHT).put(KEY_VAL, KEY_VAL);
- assert grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL).equals(KEY_VAL);
+ assertEquals(KEY_VAL, grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL));
+ assertEquals(KEY_VAL, grid(1).cache(CACHE_NAME_DHT).get(KEY_VAL));
+ assertEquals(KEY_VAL, grid(2).cache(CACHE_NAME_DHT).get(KEY_VAL));
- //DHT Destroy. Cache should be removed from each node.
+ assertFalse(grid(0).configuration().isClientMode());
+
+ // DHT Destroy. Cache should be removed from each node.
grid(0).cache(CACHE_NAME_DHT).destroy();
try {
grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored0) {
try {
grid(1).cache(CACHE_NAME_DHT).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored1) {
try {
grid(2).cache(CACHE_NAME_DHT).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored2) {
// No-op
@@ -224,6 +234,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*/
public void testClientDoubleDestroy() throws Exception {
clientDestroy();
+
clientDestroy();
}
@@ -235,29 +246,36 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
private void clientDestroy() throws Exception {
grid(0).getOrCreateCache(getClientConfig());
- assert grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL) == null;
+ assertNull(grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL));
grid(0).cache(CACHE_NAME_CLIENT).put(KEY_VAL, KEY_VAL);
- assert grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL).equals(KEY_VAL);
+ assertEquals(KEY_VAL, grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL));
+ assertEquals(KEY_VAL, grid(1).cache(CACHE_NAME_CLIENT).get(KEY_VAL));
+ assertEquals(KEY_VAL, grid(2).cache(CACHE_NAME_CLIENT).get(KEY_VAL));
- //DHT Destroy from client node. Cache should be removed from each node.
+ // DHT Destroy from client node. Cache should be removed from each node.
+
+ assertTrue(grid(2).configuration().isClientMode());
grid(2).cache(CACHE_NAME_CLIENT).destroy();// Client node.
try {
grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored0) {
try {
grid(1).cache(CACHE_NAME_CLIENT).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored1) {
try {
grid(2).cache(CACHE_NAME_CLIENT).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored2) {
// No-op
@@ -273,6 +291,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*/
public void testNearDoubleDestroy() throws Exception {
nearDestroy();
+
nearDestroy();
}
@@ -286,31 +305,34 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
grid(2).getOrCreateNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration());
- assert grid(0).cache(CACHE_NAME_NEAR).get(KEY_VAL) == null;
- assert grid(2).cache(CACHE_NAME_NEAR).get(KEY_VAL) == null;
+ assertNull(grid(0).cache(CACHE_NAME_NEAR).get(KEY_VAL));
+ assertNull(grid(2).cache(CACHE_NAME_NEAR).get(KEY_VAL));
grid(2).cache(CACHE_NAME_NEAR).put(KEY_VAL, KEY_VAL);
grid(0).cache(CACHE_NAME_NEAR).put(KEY_VAL, "near-test");
- assert grid(2).cache(CACHE_NAME_NEAR).localPeek(KEY_VAL).equals("near-test");
+ assertEquals("near-test", grid(2).cache(CACHE_NAME_NEAR).localPeek(KEY_VAL));
- //Local destroy. Cache should be removed from each node.
+ // Local destroy. Cache should be removed from each node.
grid(2).cache(CACHE_NAME_NEAR).destroy();
try {
grid(0).cache(CACHE_NAME_NEAR).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored0) {
try {
grid(1).cache(CACHE_NAME_NEAR).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored1) {
try {
grid(2).cache(CACHE_NAME_NEAR).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored2) {
// No-op
@@ -326,6 +348,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*/
public void testLocalDoubleDestroy() throws Exception {
localDestroy();
+
localDestroy();
}
@@ -352,17 +375,20 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
try {
grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored0) {
try {
grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored1) {
try {
grid(2).cache(CACHE_NAME_LOC).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored2) {
// No-op
@@ -377,43 +403,48 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testDhtClose() throws Exception {
- IgniteCache<String, String> dhtCache0 = grid(0).getOrCreateCache(getDhtConfig());
+ IgniteCache<Integer, Integer> dhtCache0 = grid(0).getOrCreateCache(getDhtConfig());
- assert dhtCache0.get(KEY_VAL) == null;
+ final Integer key = primaryKey(dhtCache0);
- dhtCache0.put(KEY_VAL, KEY_VAL);
+ assertNull(dhtCache0.get(key));
- assert dhtCache0.get(KEY_VAL).equals(KEY_VAL);
+ dhtCache0.put(key, key);
- //DHT Close. No-op.
+ assertEquals(key, dhtCache0.get(key));
- IgniteCache<String, String> dhtCache1 = grid(1).cache(CACHE_NAME_DHT);
- IgniteCache<String, String> dhtCache2 = grid(2).cache(CACHE_NAME_DHT);
+ // DHT Close. No-op.
+
+ IgniteCache<Integer, Integer> dhtCache1 = grid(1).cache(CACHE_NAME_DHT);
+ IgniteCache<Integer, Integer> dhtCache2 = grid(2).cache(CACHE_NAME_DHT);
dhtCache0.close();
try {
- dhtCache0.get(KEY_VAL);// Not affected, but can not be taken.
- assert false;
+ dhtCache0.get(key);// Not affected, but can not be taken.
+
+ fail();
}
catch (IllegalStateException ignored) {
// No-op
}
- assert dhtCache1.get(KEY_VAL).equals(KEY_VAL);// Not affected.
- assert dhtCache2.get(KEY_VAL).equals(KEY_VAL);// Not affected.
+ assertEquals(key, dhtCache1.get(key)); // Not affected.
+ assertEquals(key, dhtCache2.get(key));// Not affected.
- //DHT Creation after closed.
+ // DHT Creation after closed.
- dhtCache0 = grid(0).cache(CACHE_NAME_DHT);
+ IgniteCache<Integer, Integer> dhtCache0New = grid(0).cache(CACHE_NAME_DHT);
- assert dhtCache0.get(KEY_VAL).equals(KEY_VAL);// Not affected, can be taken since cache reopened.
+ assertNotSame(dhtCache0, dhtCache0New);
- dhtCache2.put(KEY_VAL, KEY_VAL + "recreated");
+ assertEquals(key, dhtCache0New.get(key)); // Not affected, can be taken since cache reopened.
- assert dhtCache0.get(KEY_VAL).equals(KEY_VAL + "recreated");
+ dhtCache2.put(key, key + 1);
- //Check close at last node.
+ assertEquals((Object)(key + 1), dhtCache0New.get(key));
+
+ // Check close at last node.
stopAllGrids(true);
@@ -421,18 +452,19 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
dhtCache0 = grid(0).getOrCreateCache(getDhtConfig());
- assert dhtCache0.get(KEY_VAL) == null;
+ assertNull(dhtCache0.get(key));
- dhtCache0.put(KEY_VAL, KEY_VAL);
+ dhtCache0.put(key, key);
- assert dhtCache0.get(KEY_VAL).equals(KEY_VAL);
+ assertEquals(key, dhtCache0.get(key));
// Closing last node.
dhtCache0.close();
try {
- dhtCache0.get(KEY_VAL);// Can not be taken.
- assert false;
+ dhtCache0.get(key);// Can not be taken.
+
+ fail();
}
catch (IllegalStateException ignored) {
// No-op
@@ -441,7 +473,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
// Reopening cache.
dhtCache0 = grid(0).cache(CACHE_NAME_DHT);
- assert dhtCache0.get(KEY_VAL).equals(KEY_VAL);// Entry not loosed.
+ assertEquals(key, dhtCache0.get(key)); // Entry not loosed.
}
/**
@@ -457,9 +489,16 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_DHT);
IgniteCache<String, String> cache2 = grid(2).cache(CACHE_NAME_DHT);
- assert cache0.get(KEY_VAL) == null || cache0.get(KEY_VAL).equals(curVal);
- assert cache1.get(KEY_VAL) == null || cache1.get(KEY_VAL).equals(curVal);
- assert cache2.get(KEY_VAL) == null || cache2.get(KEY_VAL).equals(curVal);
+ if (i == 0) {
+ assert cache0.get(KEY_VAL) == null;
+ assert cache1.get(KEY_VAL) == null;
+ assert cache2.get(KEY_VAL) == null;
+ }
+ else {
+ assert cache0.get(KEY_VAL).equals(curVal);
+ assert cache1.get(KEY_VAL).equals(curVal);
+ assert cache2.get(KEY_VAL).equals(curVal);
+ }
curVal = KEY_VAL + curVal;
@@ -486,7 +525,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
assert cache0.get(KEY_VAL).equals(KEY_VAL);
- //DHT Close from client node. Should affect only client node.
+ // DHT Close from client node. Should affect only client node.
IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_CLIENT);
IgniteCache<String, String> cache2 = grid(2).cache(CACHE_NAME_CLIENT);
@@ -500,22 +539,25 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
try {
cache2.get(KEY_VAL);// Affected.
+
assert false;
}
catch (IllegalStateException ignored) {
// No-op
}
- //DHT Creation from client node after closed.
- cache2 = grid(2).cache(CACHE_NAME_CLIENT);
+ // DHT Creation from client node after closed.
+ IgniteCache<String, String> cache2New = grid(2).cache(CACHE_NAME_CLIENT);
- assert cache2.get(KEY_VAL).equals(KEY_VAL);
+ assertNotSame(cache2, cache2New);
+
+ assert cache2New.get(KEY_VAL).equals(KEY_VAL);
cache0.put(KEY_VAL, KEY_VAL + "recreated");
assert cache0.get(KEY_VAL).equals(KEY_VAL + "recreated");
assert cache1.get(KEY_VAL).equals(KEY_VAL + "recreated");
- assert cache2.get(KEY_VAL).equals(KEY_VAL + "recreated");
+ assert cache2New.get(KEY_VAL).equals(KEY_VAL + "recreated");
}
/**
@@ -531,9 +573,16 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
IgniteCache<String, String> cache0 = grid(0).cache(CACHE_NAME_CLIENT);
IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_CLIENT);
- assert cache0.get(KEY_VAL) == null || cache0.get(KEY_VAL).equals(curVal);
- assert cache1.get(KEY_VAL) == null || cache1.get(KEY_VAL).equals(curVal);
- assert cache2.get(KEY_VAL) == null || cache2.get(KEY_VAL).equals(curVal);
+ if (i == 0) {
+ assert cache0.get(KEY_VAL) == null;
+ assert cache1.get(KEY_VAL) == null;
+ assert cache2.get(KEY_VAL) == null;
+ }
+ else {
+ assert cache0.get(KEY_VAL).equals(curVal);
+ assert cache1.get(KEY_VAL).equals(curVal);
+ assert cache2.get(KEY_VAL).equals(curVal);
+ }
curVal = KEY_VAL + curVal;
@@ -556,17 +605,17 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
public void testNearClose() throws Exception {
IgniteCache<String, String> cache0 = grid(0).getOrCreateCache(getNearConfig());
- //GridDhtTxPrepareRequest requests to Client node will be counted.
+ // GridDhtTxPrepareRequest requests to Client node will be counted.
CountingTxRequestsToClientNodeTcpCommunicationSpi.nodeFilter = grid(2).context().localNodeId();
- //Near Close from client node.
+ // Near Close from client node.
IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_NEAR);
IgniteCache<String, String> cache2 = grid(2).createNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration());
assert cache2.get(KEY_VAL) == null;
- //Subscribing to events.
+ // Subscribing to events.
cache2.put(KEY_VAL, KEY_VAL);
CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.set(0);
@@ -584,12 +633,12 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.set(0);
- //Should not produce messages to client node.
+ // Should not produce messages to client node.
cache0.put(KEY_VAL, KEY_VAL + 0);
U.sleep(1000);
- //Ensure near cache was NOT automatically updated.
+ // Ensure near cache was NOT automatically updated.
assert CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.get() == 0;
assert cache0.get(KEY_VAL).equals(KEY_VAL + 0);// Not affected.
@@ -597,26 +646,29 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
try {
cache2.get(KEY_VAL);// Affected.
+
assert false;
}
catch (IllegalArgumentException | IllegalStateException ignored) {
// No-op
}
- //Near Creation from client node after closed.
+ // Near Creation from client node after closed.
- cache2 = grid(2).createNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration());
+ IgniteCache<String, String> cache2New = grid(2).createNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration());
- //Subscribing to events.
- cache2.put(KEY_VAL, KEY_VAL);
+ assertNotSame(cache2, cache2New);
- assert cache2.localPeek(KEY_VAL).equals(KEY_VAL);
+ // Subscribing to events.
+ cache2New.put(KEY_VAL, KEY_VAL);
+
+ assert cache2New.localPeek(KEY_VAL).equals(KEY_VAL);
cache0.put(KEY_VAL, KEY_VAL + "recreated");
assert cache0.get(KEY_VAL).equals(KEY_VAL + "recreated");
assert cache1.get(KEY_VAL).equals(KEY_VAL + "recreated");
- assert cache2.localPeek(KEY_VAL).equals(KEY_VAL + "recreated");
+ assert cache2New.localPeek(KEY_VAL).equals(KEY_VAL + "recreated");
}
/**
@@ -629,8 +681,10 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
grid(0).getOrCreateCache(getNearConfig());
+ NearCacheConfiguration nearCfg = new NearCacheConfiguration();
+
for (int i = 0; i < 3; i++) {
- try (IgniteCache<String, String> cache2 = grid(2).getOrCreateNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration())) {
+ try (IgniteCache<String, String> cache2 = grid(2).getOrCreateNearCache(CACHE_NAME_NEAR, nearCfg)) {
IgniteCache<String, String> cache0 = grid(0).cache(CACHE_NAME_NEAR);
IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_NEAR);
@@ -670,23 +724,26 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
assert grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + 0);
assert grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + 1);
- //Local close. Same as Local destroy.
+ // Local close. Same as Local destroy.
grid(1).cache(CACHE_NAME_LOC).close();
try {
grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored0) {
try {
grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored1) {
try {
grid(2).cache(CACHE_NAME_LOC).get(KEY_VAL);
- assert false;
+
+ fail();
}
catch (IllegalArgumentException | IllegalStateException ignored2) {
// No-op
@@ -694,7 +751,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
}
}
- //Local creation after closed.
+ // Local creation after closed.
grid(0).getOrCreateCache(getLocalConfig());
@@ -742,7 +799,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
*
* @throws Exception If failed.
*/
- public void _testConcurrentCloseSetWithTry() throws Exception {
+ public void testConcurrentCloseSetWithTry() throws Exception {
final AtomicInteger a1 = new AtomicInteger();
final AtomicInteger a2 = new AtomicInteger();
final AtomicInteger a3 = new AtomicInteger();
@@ -750,21 +807,29 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
Thread t1 = new Thread(new Runnable() {
@Override public void run() {
+ Thread.currentThread().setName("test-thread-1");
+
closeWithTry(a1, 0);
}
});
Thread t2 = new Thread(new Runnable() {
@Override public void run() {
+ Thread.currentThread().setName("test-thread-2");
+
closeWithTry(a2, 0);
}
});
Thread t3 = new Thread(new Runnable() {
@Override public void run() {
+ Thread.currentThread().setName("test-thread-3");
+
closeWithTry(a3, 2);
}
});
Thread t4 = new Thread(new Runnable() {
@Override public void run() {
+ Thread.currentThread().setName("test-thread-4");
+
closeWithTry(a4, 2);
}
});
@@ -778,9 +843,12 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
t3.start();
t4.start();
- U.sleep(1000);
-
- stop = true;
+ try {
+ U.sleep(1000);
+ }
+ finally {
+ stop = true;
+ }
t1.join();
t2.join();
@@ -835,10 +903,12 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
cache.close();
+ cache.close();
+
try {
cache.get("key");
- assert false;
+ fail();
}
catch (IllegalStateException e) {
// No-op;