You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/03/05 03:54:54 UTC
[2/4] incubator-ignite git commit: IGNITE-45 - WIP
IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/03758eaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/03758eaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/03758eaa
Branch: refs/heads/ignite-45
Commit: 03758eaabdaf3b0cb69016fac66ec85f95d06a60
Parents: 32e26d3
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Mar 4 17:41:54 2015 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Mar 4 17:41:54 2015 -0800
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 11 +-
.../cache/DynamicCacheChangeBatch.java | 45 +++
.../cache/DynamicCacheChangeRequest.java | 120 ++++++++
.../cache/DynamicCacheDescriptor.java | 31 +-
.../processors/cache/GridCacheContext.java | 19 +-
.../processors/cache/GridCacheGateway.java | 16 ++
.../processors/cache/GridCacheIoManager.java | 16 ++
.../GridCachePartitionExchangeManager.java | 26 +-
.../processors/cache/GridCacheProcessor.java | 281 ++++++++++++++-----
.../cache/GridCacheSharedContext.java | 7 +-
.../processors/cache/IgniteCacheProxy.java | 7 +
.../GridDhtPartitionsExchangeFuture.java | 54 ++--
.../cache/IgniteDynamicCacheStartSelfTest.java | 87 +++++-
13 files changed, 597 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d891149..04ca3d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -218,7 +218,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Adds dynamic cache filters.
+ * Adds dynamic cache filter.
*
* @param cacheName Cache name.
* @param filter Cache filter.
@@ -229,6 +229,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
assert old == null;
}
+ /**
+ * Removes dynamic cache filter.
+ *
+ * @param cacheName Cache name.
+ */
+ public void removeDynamicCacheFilter(String cacheName) {
+ dynamicCacheFilters.remove(cacheName);
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
discoOrdered = discoOrdered();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
new file mode 100644
index 0000000..d657707
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Cache change batch.
+ */
+public class DynamicCacheChangeBatch implements Serializable {
+ /** Change requests. */
+ private Collection<DynamicCacheChangeRequest> reqs;
+
+ /**
+ * @param reqs Requests.
+ */
+ public DynamicCacheChangeBatch(
+ Collection<DynamicCacheChangeRequest> reqs
+ ) {
+ this.reqs = reqs;
+ }
+
+ /**
+ * @return Collection of change requests.
+ */
+ public Collection<DynamicCacheChangeRequest> requests() {
+ return reqs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
new file mode 100644
index 0000000..2a61200
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.io.*;
+
+/**
+ * Cache start/stop request.
+ */
+public class DynamicCacheChangeRequest implements Serializable {
+ /** Start ID. */
+ private IgniteUuid deploymentId;
+
+ /** Stop cache name. */
+ @GridToStringExclude
+ private final String stopName;
+
+ /** Cache start configuration. */
+ private final CacheConfiguration startCfg;
+
+ /** Cache start node filter. */
+ private final IgnitePredicate<ClusterNode> startNodeFltr;
+
+ /**
+ * Constructor creates cache start request.
+ *
+ * @param startCfg Start cache configuration.
+ * @param startNodeFltr Start node filter.
+ */
+ public DynamicCacheChangeRequest(
+ CacheConfiguration startCfg,
+ IgnitePredicate<ClusterNode> startNodeFltr
+ ) {
+ this.startCfg = startCfg;
+ this.startNodeFltr = startNodeFltr;
+
+ deploymentId = IgniteUuid.randomUuid();
+ stopName = null;
+ }
+
+ /**
+ * Constructor creates cache stop request.
+ *
+ * @param stopName Cache stop name.
+ */
+ public DynamicCacheChangeRequest(String stopName) {
+ this.stopName = stopName;
+
+ startCfg = null;
+ startNodeFltr = null;
+ }
+
+ /**
+ * @return Deployment ID.
+ */
+ public IgniteUuid deploymentId() {
+ return deploymentId;
+ }
+
+ /**
+ * @param deploymentId Deployment ID.
+ */
+ public void deploymentId(IgniteUuid deploymentId) {
+ this.deploymentId = deploymentId;
+ }
+
+ /**
+ * @return {@code True} if this is a start request.
+ */
+ public boolean isStart() {
+ return startCfg != null;
+ }
+
+ /**
+ * @return Cache name.
+ */
+ public String cacheName() {
+ return stopName != null ? stopName : startCfg.getName();
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ public CacheConfiguration startCacheConfiguration() {
+ return startCfg;
+ }
+
+ /**
+ * @return Node filter.
+ */
+ public IgnitePredicate<ClusterNode> startNodeFilter() {
+ return startNodeFltr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 6a6e227..e7f7e2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -23,14 +23,12 @@ import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import java.io.*;
-
/**
* Cache start descriptor.
*/
-public class DynamicCacheDescriptor implements Serializable {
+public class DynamicCacheDescriptor {
/** Cache start ID. */
- private IgniteUuid startId;
+ private IgniteUuid deploymentId;
/** Cache configuration. */
@GridToStringExclude
@@ -40,21 +38,24 @@ public class DynamicCacheDescriptor implements Serializable {
@GridToStringExclude
private IgnitePredicate<ClusterNode> nodeFilter;
+ /** Cancelled flag. */
+ private boolean cancelled;
+
/**
* @param cacheCfg Cache configuration.
* @param nodeFilter Node filter.
*/
- public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid startId) {
+ public DynamicCacheDescriptor(CacheConfiguration cacheCfg, IgnitePredicate<ClusterNode> nodeFilter, IgniteUuid deploymentId) {
this.cacheCfg = cacheCfg;
this.nodeFilter = nodeFilter;
- this.startId = startId;
+ this.deploymentId = deploymentId;
}
/**
* @return Start ID.
*/
- public IgniteUuid startId() {
- return startId;
+ public IgniteUuid deploymentId() {
+ return deploymentId;
}
/**
@@ -71,6 +72,20 @@ public class DynamicCacheDescriptor implements Serializable {
return nodeFilter;
}
+ /**
+ * Sets cancelled flag.
+ */
+ public void onCancelled() {
+ cancelled = true;
+ }
+
+ /**
+ * @return Cancelled flag.
+ */
+ public boolean cancelled() {
+ return cancelled;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheDescriptor.class, this, "cacheName", cacheCfg.getName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 159fd1b..557b6e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -205,6 +205,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Conflict resolver. */
private GridCacheVersionAbstractConflictResolver conflictRslvr;
+ /** Dynamic cache deployment ID. */
+ private IgniteUuid dynamicDeploymentId;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -328,6 +331,20 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @param dynamicDeploymentId Dynamic deployment ID.
+ */
+ void dynamicDeploymentId(IgniteUuid dynamicDeploymentId) {
+ this.dynamicDeploymentId = dynamicDeploymentId;
+ }
+
+ /**
+ * @return Dynamic deployment ID.
+ */
+ public IgniteUuid dynamicDeploymentId() {
+ return dynamicDeploymentId;
+ }
+
+ /**
* Initialize conflict resolver after all managers are started.
*/
void initConflictResolver() {
@@ -1033,7 +1050,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* Sets default affinity key mapper.
*/
public void defaultAffMapper(CacheAffinityKeyMapper dfltAffMapper) {
- this.affMapper = dfltAffMapper;
+ affMapper = dfltAffMapper;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 2de235a..8969471 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -30,6 +30,9 @@ public class GridCacheGateway<K, V> {
/** Context. */
private final GridCacheContext<K, V> ctx;
+ /** Stopped flag for dynamic caches. */
+ private volatile boolean stopped;
+
/**
* @param ctx Cache context.
*/
@@ -49,6 +52,9 @@ public class GridCacheGateway<K, V> {
// Must unlock in case of unexpected errors to avoid
// deadlocks during kernal stop.
try {
+ if (stopped)
+ throw new IllegalStateException("Dynamic cache has been concurrently stopped: " + ctx.name());
+
ctx.kernalContext().gateway().readLock();
}
catch (IllegalStateException e) {
@@ -110,6 +116,9 @@ public class GridCacheGateway<K, V> {
// Must unlock in case of unexpected errors to avoid
// deadlocks during kernal stop.
try {
+ if (stopped)
+ throw new IllegalStateException("Dynamic cache has been concurrently stopped: " + ctx.name());
+
ctx.kernalContext().gateway().readLock();
// Set thread local projection per call.
@@ -152,4 +161,11 @@ public class GridCacheGateway<K, V> {
ctx.kernalContext().gateway().readUnlock();
}
}
+
+ /**
+ *
+ */
+ public void onStopped() {
+ stopped = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b504e21..7a3ada7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -599,6 +599,22 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
}
/**
+ * @param cacheId Cache ID to remove handlers for.
+ */
+ public void removeHandlers(int cacheId) {
+ assert cacheId != 0;
+
+ idxClsHandlers.remove(cacheId);
+
+ for (Iterator<ListenerKey> iterator = clsHandlers.keySet().iterator(); iterator.hasNext(); ) {
+ ListenerKey key = iterator.next();
+
+ if (key.cacheId == cacheId)
+ iterator.remove();
+ }
+ }
+
+ /**
* @param lsnr Listener to add.
*/
public void addDisconnectListener(GridDisconnectListener lsnr) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d38161e..f7f1f9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -142,16 +142,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else {
DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
- if (customEvt.data() instanceof DynamicCacheDescriptor) {
- DynamicCacheDescriptor desc = (DynamicCacheDescriptor)customEvt.data();
+ if (customEvt.data() instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.data();
- // Check if this event should trigger partition exchange.
- if (cctx.cache().dynamicCacheRegistered(desc)) {
+ Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
+
+ // Validate requests to check if event should trigger partition exchange.
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ if (cctx.cache().dynamicCacheRegistered(req))
+ valid.add(req);
+ }
+
+ if (!F.isEmpty(valid)) {
exchId = exchangeId(n.id(),
new AffinityTopologyVersion(e.topologyVersion(), ++minorTopVer),
e.type());
- exchFut = exchangeFuture(exchId, e, desc);
+ exchFut = exchangeFuture(exchId, e, valid);
}
}
}
@@ -588,11 +595,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @return Exchange future.
*/
GridDhtPartitionsExchangeFuture<K, V> exchangeFuture(GridDhtPartitionExchangeId exchId,
- @Nullable DiscoveryEvent discoEvt, @Nullable DynamicCacheDescriptor startDesc) {
+ @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs) {
GridDhtPartitionsExchangeFuture<K, V> fut;
GridDhtPartitionsExchangeFuture<K, V> old = exchFuts.addx(
- fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, startDesc));
+ fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, reqs));
if (old != null)
fut = old;
@@ -615,11 +622,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
fut.cleanUp();
}
}
-
- DynamicCacheDescriptor desc = exchFut.dynamicCacheDescriptor();
-
- if (desc != null)
- cctx.cache().onCacheStartFinished(desc);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9a8cbcb..b7ac0af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -25,10 +25,7 @@ import org.apache.ignite.cache.affinity.rendezvous.*;
import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.events.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.processors.cache.datastructures.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -109,7 +106,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private IgniteTransactionsImpl transactions;
/** Pending cache starts. */
- private ConcurrentMap<String, IgniteInternalFuture> pendingStarts = new ConcurrentHashMap<>();
+ private ConcurrentMap<String, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>();
/** Dynamic caches. */
private ConcurrentMap<String, DynamicCacheDescriptor> dynamicCaches = new ConcurrentHashMap<>();
@@ -567,8 +564,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() {
@Override public void apply(Serializable evt) {
- if (evt instanceof DynamicCacheDescriptor)
- onCacheStartRequested((DynamicCacheDescriptor)evt);
+ if (evt instanceof DynamicCacheChangeBatch)
+ onCacheChangeRequested((DynamicCacheChangeBatch)evt);
}
});
@@ -1170,23 +1167,29 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param desc Descriptor to check.
- * @return {@code True} if cache was registered for start and exchange future should be created.
+ * @param req Request to check.
+ * @return {@code True} if change request was registered to apply.
*/
- public boolean dynamicCacheRegistered(DynamicCacheDescriptor desc) {
- return dynamicCaches.get(desc.cacheConfiguration().getName()) == desc;
+ public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) {
+ DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName());
+
+ return desc != null && desc.deploymentId().equals(req.deploymentId()) && desc.cancelled() != req.isStart();
}
/**
- * @param startDesc Start descriptor.
+ * @param req Start request.
*/
- public void onCacheStartExchange(DynamicCacheDescriptor startDesc) throws IgniteCheckedException {
- CacheConfiguration cfg = new CacheConfiguration(startDesc.cacheConfiguration());
+ public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException {
+ assert req.isStart();
+
+ CacheConfiguration cfg = new CacheConfiguration(req.startCacheConfiguration());
initialize(cfg);
GridCacheContext cacheCtx = createCache(cfg);
+ cacheCtx.dynamicDeploymentId(req.deploymentId());
+
sharedCtx.addCacheContext(cacheCtx);
startCache(cacheCtx.cache());
@@ -1196,26 +1199,58 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param req Stop request.
+ */
+ public void prepareCacheStop(DynamicCacheChangeRequest req) {
+ assert !req.isStart();
+
+ // Break the proxy before exchange future is done.
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(req.cacheName());
+
+ if (proxy != null)
+ proxy.gate().onStopped();
+
+ GridCacheAdapter<?, ?> cache = caches.remove(req.cacheName());
+
+ if (cache != null) {
+ GridCacheContext<?, ?> ctx = cache.context();
+
+ sharedCtx.removeCacheContext(ctx);
+
+ assert req.deploymentId().equals(ctx.dynamicDeploymentId());
+
+ onKernalStop(cache, true);
+ stopCache(cache, true);
+ }
+ }
+
+ /**
* Callback invoked when first exchange future for dynamic cache is completed.
*
- * @param startDesc Cache start descriptor.
+ * @param req Change request.
*/
@SuppressWarnings("unchecked")
- public void onCacheStartFinished(DynamicCacheDescriptor startDesc) {
- GridCacheAdapter<?, ?> cache = caches.get(startDesc.cacheConfiguration().getName());
+ public void onExchangeDone(DynamicCacheChangeRequest req) {
+ if (req.isStart()) {
+ GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());
- if (cache != null)
- jCacheProxies.put(cache.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+ if (cache != null)
+ jCacheProxies.put(cache.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+ }
+ else {
+ DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName());
- CacheConfiguration ccfg = startDesc.cacheConfiguration();
+ if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
+ dynamicCaches.remove(req.cacheName(), desc);
+ }
- DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());
+ DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.cacheName());
- if (fut != null && fut.startId().equals(startDesc.startId())) {
- fut.onDone();
+ assert req.deploymentId() != null;
+ assert fut == null || fut.deploymentId != null;
- pendingStarts.remove(ccfg.getName(), fut);
- }
+ if (fut != null && fut.deploymentId().equals(req.deploymentId()))
+ fut.onDone();
}
/**
@@ -1285,69 +1320,162 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (nodeFilter == null)
nodeFilter = F.alwaysTrue();
- DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, IgniteUuid.fromUuid(ctx.localNodeId()));
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ccfg, nodeFilter);
- try {
- for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) {
- if (ccfg0.getName().equals(ccfg.getName()))
- return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
- "(a cache with the same name is already configured): " + ccfg.getName()));
- }
+ return F.first(initiateCacheChanges(F.asList(req)));
+ }
- if (caches.containsKey(ccfg.getName()))
- return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
- "(a cache with the same name is already started): " + ccfg.getName()));
+ /**
+ * @param cacheName Cache name to stop.
+ * @return Future that will be completed when cache is stopped.
+ */
+ public IgniteInternalFuture<?> dynamicStopCache(String cacheName) {
+ return F.first(initiateCacheChanges(F.asList(new DynamicCacheChangeRequest(cacheName))));
+ }
- IgniteInternalFuture<?> old = pendingStarts.putIfAbsent(ccfg.getName(), fut);
+ /**
+ * @param reqs Requests.
+ * @return Collection of futures.
+ */
+ public Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs) {
+ Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
- if (old != null)
- return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
- "(a cache with the same name is already started): " + ccfg.getName()));
+ Collection<DynamicCacheChangeRequest> sendReqs = new ArrayList<>(reqs.size());
- ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, nodeFilter, fut.startId()));
+ for (DynamicCacheChangeRequest req : reqs) {
+ DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, req.cacheName(), req.deploymentId());
- return fut;
- }
- catch (Exception e) {
- fut.onDone(e);
+ try {
+ for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) {
+ if (ccfg0.getName().equals(req.cacheName())) {
+ Exception ex = new IgniteCheckedException("Failed to " +
+ (req.isStart() ? "start" : "stop") + " cache " +
+ "(a cache with the same name is manually configured): " + ccfg0.getName());
+
+ fut.onDone(ex);
+
+ break;
+ }
+ }
+
+ if (fut.isDone())
+ continue;
- // Safety.
- pendingStarts.remove(ccfg.getName(), fut);
+ if (req.isStart()) {
+ if (caches.containsKey(req.cacheName())) {
+ fut.onDone(new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
+ "(a cache with the same name is already started): " + req.cacheName())));
+ }
+ }
+ else {
+ GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());
- return fut;
+ if (cache == null)
+ // No-op.
+ fut.onDone();
+ else {
+ IgniteUuid dynamicDeploymentId = cache.context().dynamicDeploymentId();
+
+ assert dynamicDeploymentId != null;
+
+ // Save deployment ID to avoid concurrent stops.
+ req.deploymentId(dynamicDeploymentId);
+ fut.deploymentId = dynamicDeploymentId;
+ }
+ }
+
+ if (fut.isDone())
+ continue;
+
+ DynamicCacheStartFuture old = (DynamicCacheStartFuture)pendingFuts.putIfAbsent(req.cacheName(), fut);
+
+ if (old != null) {
+ if (req.isStart()) {
+ fut.onDone(new IgniteCheckedException("Failed to start cache " +
+ "(a cache with the same name is already being started or stopped): " + req.cacheName()));
+ }
+ else {
+ fut = old;
+
+ continue;
+ }
+ }
+
+ if (fut.isDone())
+ continue;
+
+ sendReqs.add(req);
+ }
+ catch (Exception e) {
+ fut.onDone(e);
+ }
+ finally {
+ res.add(fut);
+ }
}
+
+ ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sendReqs));
+
+ return res;
}
/**
* Callback invoked from discovery thread when cache deployment request is received.
*
- * @param startDesc Cache start descriptor.
+ * @param batch Change request batch.
*/
- private void onCacheStartRequested(DynamicCacheDescriptor startDesc) {
- CacheConfiguration ccfg = startDesc.cacheConfiguration();
+ private void onCacheChangeRequested(DynamicCacheChangeBatch batch) {
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ if (req.isStart()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
- // Check if cache with the same name was concurrently started form different node.
- if (dynamicCaches.containsKey(ccfg.getName())) {
- // If local node initiated start, fail the start future.
- DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());
+ // Check if cache with the same name was concurrently started form different node.
+ if (dynamicCaches.containsKey(ccfg.getName())) {
+ // If local node initiated start, fail the start future.
+ DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get(ccfg.getName());
- if (startFut != null && startFut.startId().equals(startDesc.startId())) {
- assert !startFut.syncNotify();
+ if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) {
+ assert !startFut.syncNotify();
- startFut.onDone(new IgniteCheckedException("Failed to start cache " +
- "(a cache with the same name is already started): " + ccfg.getName()));
+ startFut.onDone(new IgniteCheckedException("Failed to start cache " +
+ "(a cache with the same name is already started): " + ccfg.getName()));
+ }
- pendingStarts.remove(ccfg.getName(), startFut);
+ return;
+ }
+
+ DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ccfg, req.startNodeFilter(),
+ req.deploymentId());
+
+ DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc);
+
+ ctx.discovery().addDynamicCacheFilter(ccfg.getName(), startDesc.nodeFilter());
+
+ assert old == null :
+ "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
}
+ else {
+ DynamicCacheDescriptor desc = dynamicCaches.get(req.cacheName());
- return;
- }
+ if (desc == null) {
+ // If local node initiated start, fail the start future.
+ DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(req.cacheName());
+
+ if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) {
+ assert !changeFut.syncNotify();
- DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc);
+ // No-op.
+ changeFut.onDone();
+ }
+
+ return;
+ }
- ctx.discovery().addDynamicCacheFilter(ccfg.getName(), startDesc.nodeFilter());
+ desc.onCancelled();
- assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
+ ctx.discovery().removeDynamicCacheFilter(req.cacheName());
+ }
+ }
}
/**
@@ -2028,25 +2156,40 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
- private static class DynamicCacheStartFuture extends GridFutureAdapter<Object> {
+ public class DynamicCacheStartFuture extends GridFutureAdapter<Object> {
/** Start ID. */
- private IgniteUuid startId;
+ private IgniteUuid deploymentId;
+
+ /** Cache name. */
+ private String cacheName;
/**
* @param ctx Kernal context.
*/
- private DynamicCacheStartFuture(GridKernalContext ctx, IgniteUuid startId) {
+ private DynamicCacheStartFuture(GridKernalContext ctx, String cacheName, IgniteUuid deploymentId) {
// Start future can be completed from discovery thread, notification must NOT be sync.
super(ctx, false);
- this.startId = startId;
+ this.deploymentId = deploymentId;
+ this.cacheName = cacheName;
}
/**
* @return Start ID.
*/
- private IgniteUuid startId() {
- return startId;
+ public IgniteUuid deploymentId() {
+ return deploymentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ pendingFuts.remove(cacheName, this);
+
+ return true;
+ }
+
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index aadb153..ef00131 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -143,7 +143,12 @@ public class GridCacheSharedContext<K, V> {
* @param cacheCtx Cache context to remove.
*/
public void removeCacheContext(GridCacheContext cacheCtx) {
- ctxMap.remove(cacheCtx.cacheId(), cacheCtx);
+ int cacheId = cacheCtx.cacheId();
+
+ ctxMap.remove(cacheId, cacheCtx);
+
+ // Safely clean up the message listeners.
+ ioMgr.removeHandlers(cacheId);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b77b8db..cddae65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -102,6 +102,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return ctx;
}
+ /**
+ * @return Gateway.
+ */
+ public GridCacheGateway<K, V> gate() {
+ return gate;
+ }
+
/** {@inheritDoc} */
@Override public CacheMetrics metrics() {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4741bf4..5a66b21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -142,8 +142,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
/** Logger. */
private IgniteLogger log;
- /** Dynamic cache start descriptor. */
- private DynamicCacheDescriptor startDesc;
+ /** Dynamic cache change requests. */
+ private Collection<DynamicCacheChangeRequest> reqs;
/**
* Dummy future created to trigger reassignments if partition
@@ -204,7 +204,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
GridCacheSharedContext<K, V> cctx,
ReadWriteLock busyLock,
GridDhtPartitionExchangeId exchId,
- DynamicCacheDescriptor startDesc
+ Collection<DynamicCacheChangeRequest> reqs
) {
super(cctx.kernalContext());
@@ -220,7 +220,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
this.cctx = cctx;
this.busyLock = busyLock;
this.exchId = exchId;
- this.startDesc = startDesc;
+ this.reqs = reqs;
log = cctx.logger(getClass());
@@ -392,13 +392,6 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
}
/**
- * @return Dynamic cache descriptor.
- */
- public DynamicCacheDescriptor dynamicCacheDescriptor() {
- return startDesc;
- }
-
- /**
* @return Init future.
*/
IgniteInternalFuture<?> initFuture() {
@@ -442,8 +435,8 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
// will return corresponding nodes.
U.await(evtLatch);
- if (startDesc != null)
- startCache();
+ if (!F.isEmpty(reqs))
+ startCaches();
assert discoEvt != null;
@@ -494,6 +487,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
if (log.isDebugEnabled())
log.debug("After waiting for partition release future: " + this);
+ if (!F.isEmpty(reqs))
+ stopCaches();
+
for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -571,12 +567,23 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
}
/**
- * Starts dynamic cache.
+ * Starts dynamic caches.
*/
- private void startCache() throws IgniteCheckedException {
- assert startDesc != null;
+ private void startCaches() throws IgniteCheckedException {
+ for (DynamicCacheChangeRequest req : reqs) {
+ if (req.isStart())
+ ctx.cache().prepareCacheStart(req);
+ }
+ }
- ctx.cache().onCacheStartExchange(startDesc);
+ /**
+ * Stop dynamic caches.
+ */
+ private void stopCaches() {
+ for (DynamicCacheChangeRequest req : reqs) {
+ if (!req.isStart())
+ ctx.cache().prepareCacheStop(req);
+ }
}
/**
@@ -673,8 +680,17 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Aff
cacheCtx.affinity().cleanUpCache(res.topologyVersion() - 10);
}
- if (startDesc != null && F.eq(startDesc.cacheConfiguration().getName(), cacheCtx.name()))
- cacheCtx.preloader().onInitialExchangeComplete(err);
+ if (!F.isEmpty(reqs)) {
+ for (DynamicCacheChangeRequest req : reqs) {
+ if (req.isStart() && F.eq(cacheCtx.name(), req.cacheName()))
+ cacheCtx.preloader().onInitialExchangeComplete(err);
+ }
+ }
+ }
+
+ if (!F.isEmpty(reqs)) {
+ for (DynamicCacheChangeRequest req : reqs)
+ cctx.cache().onExchangeDone(req);
}
cctx.exchange().onExchangeDone(this);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03758eaa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 70af7c3..ac92e72 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -32,7 +32,11 @@ import java.util.concurrent.*;
/**
* Test for dynamic cache start.
*/
+@SuppressWarnings("unchecked")
public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME = "TestDynamicCache";
+
/**
* @return Number of nodes for this test.
*/
@@ -53,18 +57,18 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testStartCacheMultithreadedSameNode() throws Exception {
- final Collection<IgniteInternalFuture<?>> futs = new ConcurrentLinkedDeque<>();
-
+ public void testStartStopCacheMultithreadedSameNode() throws Exception {
final IgniteKernal kernal = (IgniteKernal)grid(0);
+ final Collection<IgniteInternalFuture<?>> futs = new ConcurrentLinkedDeque<>();
+
int threadNum = 20;
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
CacheConfiguration ccfg = new CacheConfiguration();
- ccfg.setName("TestCacheName");
+ ccfg.setName(CACHE_NAME);
futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()));
@@ -92,6 +96,21 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
assertEquals(1, succeeded);
assertEquals(threadNum - 1, failed);
+
+ futs.clear();
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ futs.add(kernal.context().cache().dynamicStopCache(CACHE_NAME));
+
+ return null;
+ }
+ }, threadNum, "cache-stopper");
+
+ assertEquals(threadNum, futs.size());
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
}
/**
@@ -106,7 +125,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
@Override public Object call() throws Exception {
CacheConfiguration ccfg = new CacheConfiguration();
- ccfg.setName("TestCacheName2");
+ ccfg.setName(CACHE_NAME);
IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount()));
@@ -136,20 +155,35 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
assertEquals(1, succeeded);
assertEquals(threadNum - 1, failed);
+
+ futs.clear();
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount()));
+
+ futs.add(kernal.context().cache().dynamicStopCache(CACHE_NAME));
+
+ return null;
+ }
+ }, threadNum, "cache-stopper");
+
+ assertEquals(threadNum, futs.size());
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
}
/**
* @throws Exception If failed.
*/
- public void testStartCacheSimple() throws Exception {
+ public void testStartStopCacheSimple() throws Exception {
final IgniteKernal kernal = (IgniteKernal)grid(0);
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- String cacheName = "TestCacheName3";
-
- ccfg.setName(cacheName);
+ ccfg.setName(CACHE_NAME);
kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()).get();
@@ -159,12 +193,41 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
f.get();
- assertNotNull(grid(g).jcache(cacheName));
+ assertNotNull(grid(g).jcache(CACHE_NAME));
}
- grid(0).jcache(cacheName).put("1", "1");
+ grid(0).jcache(CACHE_NAME).put("1", "1");
+
+ for (int g = 0; g < nodeCount(); g++)
+ assertEquals("1", grid(g).jcache(CACHE_NAME).get("1"));
+
+ // Grab caches before stop.
+ final IgniteCache[] caches = new IgniteCache[nodeCount()];
for (int g = 0; g < nodeCount(); g++)
- assertEquals("1", grid(g).jcache(cacheName).get("1"));
+ caches[g] = grid(g).jcache(CACHE_NAME);
+
+ kernal.context().cache().dynamicStopCache(CACHE_NAME).get();
+
+ for (int g = 0; g < nodeCount(); g++) {
+ final IgniteKernal kernal0 = (IgniteKernal)grid(g);
+
+ final int idx = g;
+
+ for (IgniteInternalFuture f : kernal0.context().cache().context().exchange().exchangeFutures())
+ f.get();
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return kernal0.jcache(CACHE_NAME);
+ }
+ }, IllegalArgumentException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return caches[idx].get("1");
+ }
+ }, IllegalStateException.class, null);
+ }
}
}