You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/10 15:56:30 UTC
[1/8] incubator-ignite git commit: GG-10507 Need to add
StreamReceiver to .NET
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-901 00d151b92 -> ce2caffdd
GG-10507 Need to add StreamReceiver to .NET
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/90580d8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/90580d8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/90580d8f
Branch: refs/heads/ignite-901
Commit: 90580d8ff47b4f2a56b794c2a8596d0f9db28310
Parents: bee6f68
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Jul 9 21:18:10 2015 +0300
Committer: Pavel Tupitsyn <pt...@gridgain.com>
Committed: Thu Jul 9 21:18:10 2015 +0300
----------------------------------------------------------------------
.../internal/processors/datastreamer/DataStreamProcessor.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90580d8f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 9e53bb5..54478f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -255,6 +255,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
try {
updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+
+ if (updater != null)
+ ctx.resource().injectGeneric(updater);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
[7/8] incubator-ignite git commit: # ignite-901
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index ce59995..2d7d0ce 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -18,7 +18,6 @@
package org.apache.ignite.testsuites;
import junit.framework.*;
-import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
[8/8] incubator-ignite git commit: # ignite-901
Posted by sb...@apache.org.
# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ce2caffd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ce2caffd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ce2caffd
Branch: refs/heads/ignite-901
Commit: ce2caffdde641b6722dfc53877fe5b4633aef2a1
Parents: 782c235
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 10 12:15:01 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 10 16:56:14 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 4 +
.../ignite/internal/GridKernalContextImpl.java | 13 +-
.../ignite/internal/GridKernalGateway.java | 3 +-
.../apache/ignite/internal/IgniteKernal.java | 3 +-
.../internal/cluster/IgniteClusterImpl.java | 2 +-
.../discovery/GridDiscoveryManager.java | 119 +++++----
.../affinity/GridAffinityAssignmentCache.java | 2 +
.../processors/cache/GridCacheAdapter.java | 267 +++++++------------
.../processors/cache/GridCacheIoManager.java | 1 +
.../processors/cache/GridCacheProcessor.java | 4 +-
.../cache/GridCacheSharedContext.java | 7 +-
.../cache/GridCacheSharedManagerAdapter.java | 1 +
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../CacheDataStructuresManager.java | 1 +
.../continuous/CacheContinuousQueryHandler.java | 10 +-
.../cache/transactions/IgniteTxManager.java | 4 +-
.../continuous/GridContinuousProcessor.java | 9 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 15 ++
.../communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 59 ++--
.../GridDeploymentManagerStopSelfTest.java | 8 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 148 ++++++++--
.../IgniteCacheQuerySelfTestSuite.java | 1 -
23 files changed, 378 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index fb0a157..65e0644 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -120,12 +120,16 @@ public interface GridComponent {
@Nullable public DiscoveryDataExchangeType discoveryDataType();
/**
+ * Client disconnected callback.
+ *
* @param reconnectFut Reconnect future.
* @throws IgniteCheckedException If failed.
*/
public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException;
/**
+ * Client reconnected callback.
+ *
* @param clusterRestarted Cluster restarted flag.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 4a60e28..fd8b50c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -70,7 +70,6 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.IgniteSystemProperties.*;
-import static org.apache.ignite.internal.GridKernalState.*;
import static org.apache.ignite.internal.IgniteComponentType.*;
/**
@@ -306,6 +305,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
private MarshallerContextImpl marshCtx;
/** */
+ private ClusterNode locNode;
+
+ /** */
private volatile boolean disconnected;
/**
@@ -330,6 +332,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
* @param restExecSvc REST executor service.
+ * @param plugins Plugin providers.
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings("TypeMayBeWeakened")
@@ -506,9 +509,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
return ((IgniteKernal)grid).isStopping();
}
- /** */
- private ClusterNode locNode;
-
/** {@inheritDoc} */
@Override public UUID localNodeId() {
if (locNode != null)
@@ -918,7 +918,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** {@inheritDoc} */
@Override public boolean clientDisconnected() {
- return locNode.isClient() && disconnected;
+ if (locNode == null)
+ locNode = discoMgr != null ? discoMgr.localNode() : null;
+
+ return locNode != null ? (locNode.isClient() && disconnected) : false;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
index 957174a..1d50aa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal;
-import org.apache.ignite.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.jetbrains.annotations.*;
@@ -116,7 +115,7 @@ public interface GridKernalGateway {
/**
* Disconnected callback.
*
- * @return Reconnect future.
+ * @return Reconnect future.
*/
@Nullable public GridFutureAdapter<?> onDisconnected();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 22338cc..4718d75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -439,8 +439,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
assert cfg != null;
return F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String, ?>, String>() {
- @Override
- public String apply(Map.Entry<String, ?> e) {
+ @Override public String apply(Map.Entry<String, ?> e) {
return e.getKey() + ", " + e.getValue().toString();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index 246eab5..0287ca7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -52,7 +52,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
@GridToStringExclude
private ConcurrentMap nodeLoc;
- /** */
+ /** Client reconnect future. */
private IgniteFuture<?> reconnecFut;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 3e8557d..044dc71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -326,7 +326,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_PHY_RAM, totSysMemory);
- final DiscoverySpi spi = getSpi();
+ DiscoverySpi spi = getSpi();
discoOrdered = discoOrdered();
@@ -477,7 +477,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// If this is a local join event, just save it and do not notify listeners.
if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) {
if (gridStartTime == 0)
- gridStartTime = spi.getGridStartTime();
+ gridStartTime = getSpi().getGridStartTime();
DiscoveryEvent discoEvt = new DiscoveryEvent();
@@ -515,9 +515,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
assert locNode.isClient() : locNode;
assert node.isClient() : node;
- boolean clusterRestarted = gridStartTime != spi.getGridStartTime();
+ boolean clusterRestarted = gridStartTime != getSpi().getGridStartTime();
- gridStartTime = spi.getGridStartTime();
+ gridStartTime = getSpi().getGridStartTime();
((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
@@ -1198,6 +1198,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
try {
return getSpi().pingNode(nodeId);
}
+ catch (IgniteException e) {
+ return false;
+ }
finally {
busyLock.leaveBusy();
}
@@ -1580,9 +1583,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param msg Custom message.
+ * @throws IgniteCheckedException If failed.
*/
- public void sendCustomEvent(DiscoveryCustomMessage msg) {
- getSpi().sendCustomEvent(new CustomMessageWrapper(msg));
+ public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedException {
+ try {
+ getSpi().sendCustomEvent(new CustomMessageWrapper(msg));
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
}
/**
@@ -1679,55 +1688,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
).start();
}
- /**
- * Method is called when any discovery event occurs.
- *
- * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
- * @param topVer Topology version.
- * @param node Remote node this event is connected with.
- * @param topSnapshot Topology snapshot.
- */
- @SuppressWarnings("RedundantTypeArguments")
- private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
- assert node != null;
-
- if (ctx.event().isRecordable(type)) {
- DiscoveryEvent evt = new DiscoveryEvent();
-
- evt.node(ctx.discovery().localNode());
- evt.eventNode(node);
- evt.type(type);
-
- evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter));
-
- if (type == EVT_NODE_METRICS_UPDATED)
- evt.message("Metrics were updated: " + node);
-
- else if (type == EVT_NODE_JOINED)
- evt.message("Node joined: " + node);
-
- else if (type == EVT_NODE_LEFT)
- evt.message("Node left: " + node);
-
- else if (type == EVT_NODE_FAILED)
- evt.message("Node failed: " + node);
-
- else if (type == EVT_NODE_SEGMENTED)
- evt.message("Node segmented: " + node);
-
- else if (type == EVT_CLIENT_NODE_DISCONNECTED)
- evt.message("Client node disconnected: " + node);
-
- else if (type == EVT_CLIENT_NODE_RECONNECTED)
- evt.message("Client node reconnected: " + node);
-
- else
- assert false;
-
- ctx.event().record(evt);
- }
- }
-
/** Worker for network segment checks. */
private class SegmentCheckWorker extends GridWorker {
/** */
@@ -1818,6 +1778,55 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Method is called when any discovery event occurs.
+ *
+ * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
+ * @param topVer Topology version.
+ * @param node Remote node this event is connected with.
+ * @param topSnapshot Topology snapshot.
+ */
+ @SuppressWarnings("RedundantTypeArguments")
+ private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
+ assert node != null;
+
+ if (ctx.event().isRecordable(type)) {
+ DiscoveryEvent evt = new DiscoveryEvent();
+
+ evt.node(ctx.discovery().localNode());
+ evt.eventNode(node);
+ evt.type(type);
+
+ evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter));
+
+ if (type == EVT_NODE_METRICS_UPDATED)
+ evt.message("Metrics were updated: " + node);
+
+ else if (type == EVT_NODE_JOINED)
+ evt.message("Node joined: " + node);
+
+ else if (type == EVT_NODE_LEFT)
+ evt.message("Node left: " + node);
+
+ else if (type == EVT_NODE_FAILED)
+ evt.message("Node failed: " + node);
+
+ else if (type == EVT_NODE_SEGMENTED)
+ evt.message("Node segmented: " + node);
+
+ else if (type == EVT_CLIENT_NODE_DISCONNECTED)
+ evt.message("Client node disconnected: " + node);
+
+ else if (type == EVT_CLIENT_NODE_RECONNECTED)
+ evt.message("Client node reconnected: " + node);
+
+ else
+ assert false;
+
+ ctx.event().record(evt);
+ }
+ }
+
+ /**
* @param type Event type.
* @param topVer Topology version.
* @param node Node.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 178226d..d5c2b1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -130,6 +130,8 @@ public class GridAffinityAssignmentCache {
/**
* Kernal stop callback.
+ *
+ * @param err Error.
*/
public void onKernalStop(IgniteCheckedException err) {
stopErr = err;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index e70d8e8..d2a730a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -909,12 +909,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return keySet((CacheEntryPredicate[]) null);
+ return keySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@Override public Set<K> keySetx() {
- return keySetx((CacheEntryPredicate[]) null);
+ return keySetx((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@@ -1220,8 +1220,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override
- public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
}
});
@@ -1263,12 +1262,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
final IgniteBiInClosure<KeyCacheObject, Object> vis) {
return ctx.closures().callLocalSafe(new GPC<Object>() {
- @Nullable
- @Override
- public Object call() {
+ @Nullable @Override public Object call() {
try {
ctx.store().loadAll(tx, keys, vis);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw new GridClosureException(e);
}
@@ -1470,9 +1468,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.config().getInterceptor() != null)
fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() {
- @Override
- public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
- return (V) ctx.config().getInterceptor().onGet(key, f.get());
+ @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
+ return (V)ctx.config().getInterceptor().onGet(key, f.get());
}
});
@@ -1984,14 +1981,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<V>() {
- @Override
- public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, filter)
- .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
+ .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2049,13 +2044,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(drMap.size() == 1) {
- @Override
- public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllDrAsync(ctx, drMap).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putAllConflict [drMap=" + drMap + ']';
}
});
@@ -2070,13 +2063,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncInOp(drMap.keySet()) {
- @Override
- public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.putAllDrAsync(ctx, drMap);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putAllConflictAsync [drMap=" + drMap + ']';
}
});
@@ -2093,9 +2084,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
- @Nullable
- @Override
- public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
@@ -2127,14 +2116,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
- @Nullable
- @Override
- public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
- @Override
- public EntryProcessor apply(K k) {
+ @Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2175,8 +2161,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, EntryProcessorResult<T>>() {
- @Override
- public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2206,8 +2191,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(keys) {
@Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
- @Override
- public EntryProcessor apply(K k) {
+ @Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2224,8 +2208,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
(IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() {
- @Override
- public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2258,8 +2241,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() {
- @Override
- public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2280,12 +2262,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(map.keySet());
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) {
- @Nullable
- @Override
- public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
IgniteInternalFuture<GridCacheReturn> fut =
- tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>) map, args);
+ tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
return fut.get().value();
}
@@ -2333,14 +2313,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putxAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2370,13 +2348,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return syncOp(new SyncOp<V>(true) {
- @Override
- public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V) tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
+ @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putIfAbsent [key=" + key + ", val=" + val + ']';
}
});
@@ -2396,14 +2372,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override
- public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putIfAbsentAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2428,13 +2402,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
Boolean stored = syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).get().success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putxIfAbsent [key=" + key + ", val=" + val + ']';
}
});
@@ -2459,14 +2431,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putxIfAbsentAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2537,13 +2507,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).get().success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replacex [key=" + key + ", val=" + val + ']';
}
});
@@ -2559,14 +2527,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replacexAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2584,8 +2550,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(newVal);
return syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
@@ -2594,8 +2559,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
.success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -2658,13 +2622,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValues(m.values());
syncOp(new SyncInOp(m.size() == 1) {
- @Override
- public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllAsync(ctx, m, false, null, -1, CU.empty0()).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putAll [map=" + m + ']';
}
});
@@ -2706,18 +2668,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
V prevVal = syncOp(new SyncOp<V>(true) {
- @Override
- public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
V ret = tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()).get().value();
if (ctx.config().getInterceptor() != null)
- return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
+ return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
return ret;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "remove [key=" + key + ']';
}
});
@@ -2740,15 +2700,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override
- public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
// TODO should we invoke interceptor here?
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAsync [key=" + key + ']';
}
});
@@ -2790,13 +2748,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
syncOp(new SyncInOp(keys.size() == 1) {
- @Override
- public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAll [keys=" + keys + ']';
}
});
@@ -2818,13 +2774,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) {
- @Override
- public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).chain(RET2NULL);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAllAsync [keys=" + keys + ']';
}
});
@@ -2847,13 +2801,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, CU.empty0()).get().success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removex [key=" + key + ']';
}
});
@@ -2887,14 +2839,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, filter).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAsync [key=" + key + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2913,21 +2863,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<GridCacheReturn>(true) {
- @Override
- public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- return (GridCacheReturn) tx.removeAllAsync(ctx,
+ return tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
ctx.equalsValArray(val)).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "remove [key=" + key + ", val=" + val + ']';
}
});
@@ -2942,13 +2890,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(false) {
- @Override
- public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllDrAsync(ctx, drMap).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAllConflict [drMap=" + drMap + ']';
}
});
@@ -2963,13 +2909,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncInOp(drMap.keySet()) {
- @Override
- public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.removeAllDrAsync(ctx, drMap);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAllDrASync [drMap=" + drMap + ']';
}
});
@@ -2985,22 +2929,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<GridCacheReturn>(true) {
- @Override
- public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- return (GridCacheReturn) tx.putAllAsync(ctx,
- F.t(key, newVal),
- true,
- null,
- -1,
- ctx.equalsValArray(oldVal)).get();
+ return tx.putAllAsync(ctx,
+ F.t(key, newVal),
+ true,
+ null,
+ -1,
+ ctx.equalsValArray(oldVal)).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -3014,17 +2956,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return asyncOp(new AsyncOp<GridCacheReturn>() {
- @Override
- public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
try {
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.removeAllAsync(ctx,
+ IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
@@ -3033,8 +2975,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -3049,17 +2990,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return asyncOp(new AsyncOp<GridCacheReturn>() {
- @Override
- public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
try {
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.putAllAsync(ctx,
+ IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.putAllAsync(ctx,
F.t(key, newVal),
true,
null,
@@ -3069,8 +3010,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replaceAsync [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -3090,8 +3030,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
@@ -3100,8 +3039,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.equalsValArray(val)).get().success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "remove [key=" + key + ", val=" + val + ']';
}
});
@@ -3126,24 +3064,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
if (ctx.deploymentEnabled()) {
try {
ctx.deploy().registerClass(val);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
}
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
ctx.equalsValArray(val)).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -3754,16 +3691,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private IgniteCacheExpiryPolicy expiryPlc =
ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
- @Override
- public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
+ @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
try {
V val = localPeek(lazyEntry.getKey(), CachePeekModes.ONHEAP_ONLY, expiryPlc);
return new CacheEntryImpl<>(lazyEntry.getKey(), val);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
- } finally {
+ }
+ finally {
ctx.gate().leave(prev);
}
}
@@ -3787,20 +3725,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
.execute();
return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
- @Override
- protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
+ @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
return new CacheEntryImpl<>(e.getKey(), e.getValue());
}
- @Override
- protected void remove(Cache.Entry<K, V> item) {
+ @Override protected void remove(Cache.Entry<K, V> item) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
try {
GridCacheAdapter.this.remove(item.getKey());
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
- } finally {
+ }
+ finally {
ctx.gate().leave(prev);
}
}
@@ -4457,8 +4395,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(Collections.singletonList(key), deserializePortable).chain(
new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override
- public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
Map<K, V> map = e.get();
assert map.isEmpty() || map.size() == 1 : map.size();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 7e6b906..84e4dc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -894,6 +894,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/**
* @param cacheId Cache ID to remove handlers for.
+ * @param type Message type.
*/
public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) {
clsHandlers.remove(new ListenerKey(cacheId, type));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 46f9206..bf0f63b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2297,7 +2297,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
}
- catch (IgniteException e) {
+ catch (IgniteCheckedException e) {
err = e;
}
}
@@ -2957,7 +2957,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
}
- catch (IgniteException e) {
+ catch (IgniteCheckedException e) {
err = e;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 91a6042..4075d79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -176,6 +176,7 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param mgrs Managers list.
* @param txMgr Transaction manager.
* @param verMgr Version manager.
* @param mvccMgr MVCC manager.
@@ -221,6 +222,7 @@ public class GridCacheSharedContext<K, V> {
* Adds cache context to shared cache context.
*
* @param cacheCtx Cache context to add.
+ * @throws IgniteCheckedException If cache ID conflict detected.
*/
@SuppressWarnings("unchecked")
public void addCacheContext(GridCacheContext cacheCtx) throws IgniteCheckedException {
@@ -315,7 +317,7 @@ public class GridCacheSharedContext<K, V> {
*/
public byte dataCenterId() {
// Data center ID is same for all caches, so grab the first one.
- GridCacheContext<K, V> cacheCtx = F.first(cacheContexts());
+ GridCacheContext<?, ?> cacheCtx = F.first(cacheContexts());
return cacheCtx.dataCenterId();
}
@@ -327,7 +329,7 @@ public class GridCacheSharedContext<K, V> {
if (preloadersStartFut == null) {
GridCompoundFuture<Object, Object> compound = null;
- for (GridCacheContext<K, V> cacheCtx : cacheContexts()) {
+ for (GridCacheContext<?, ?> cacheCtx : cacheContexts()) {
IgniteInternalFuture<Object> startFut = cacheCtx.preloader().startFuture();
if (!startFut.isDone()) {
@@ -636,6 +638,7 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param mgrs Managers list.
* @param mgr Manager to add.
* @return Added manager.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index 3ad0759..6ad76ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -117,6 +117,7 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/**
+ * @param reconnect {@code True} if manager restarted after client reconnect.
* @throws IgniteCheckedException If failed.
*/
protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 9767f49..6f2eed9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1774,7 +1774,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
assert false;
}
- @Override public void block() {
+ @Override public void stopped() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 3691ee6..f710105 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -107,6 +107,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
/**
* Client reconnect callback.
+ *
* @throws IgniteCheckedException If failed.
*/
public void onReconnected() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 2ed4341..879c30c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -396,6 +396,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryHandler.class, this);
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, cacheName);
out.writeObject(topic);
@@ -438,11 +443,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
taskHash = in.readInt();
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CacheContinuousQueryHandler.class, this);
- }
-
/**
* @param ctx Kernal context.
* @return Cache context.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index caaa22d..82543c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -154,10 +154,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture reconnectFut) {
+ txFinishSync.onDisconnected(reconnectFut);
+
for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
rollbackTx(e.getValue());
-
- txFinishSync.onDisconnected(reconnectFut);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index defcd3f..daa9494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -511,7 +511,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
}
- catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
+ catch (IgniteCheckedException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
startFuts.remove(routineId);
locInfos.remove(routineId);
@@ -576,7 +576,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Unregister handler locally.
unregisterHandler(routineId, routine.hnd, true);
- ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
+ try {
+ ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
if (ctx.isStopping())
fut.onDone();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index a49d85a..07b39bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -71,6 +71,9 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** Discovery listener. */
private GridLocalEventListener paramsLsnr;
+ /** Local node. */
+ private ClusterNode locNode;
+
/**
* Creates new adapter and initializes it from the current (this) class.
* SPI name will be initialized to the simple name of the class
@@ -112,6 +115,18 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
return ignite.cluster().localNode().id();
}
+ /**
+ * @return Local node.
+ */
+ protected ClusterNode getLocalNode() {
+ if (locNode != null)
+ return locNode;
+
+ locNode = getSpiContext().localNode();
+
+ return locNode;
+ }
+
/** {@inheritDoc} */
@Override public final String getIgniteHome() {
return ignite.configuration().getIgniteHome();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index d99a764..4fce6f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1717,7 +1717,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isTraceEnabled())
log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
- if (node.isLocal())
+ if (node.equals(getLocalNode()))
notifyListener(node.id(), msg, NOOP);
else {
GridCommunicationClient client = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 404c71d..cad5435 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.*;
/**
*
@@ -265,13 +266,13 @@ class ClientImpl extends TcpDiscoveryImpl {
else {
State state = this.state;
- if (spi.getSpiContext().isStopping() || state == State.STOPPED || state == State.SEGMENTED) {
+ if (spi.getSpiContext().isStopping() || state == STOPPED || state == SEGMENTED) {
if (pingFuts.remove(nodeId, fut))
fut.onDone(false);
return false;
}
- else if (state == State.DISCONNECTED) {
+ else if (state == DISCONNECTED) {
if (pingFuts.remove(nodeId, fut))
fut.onDone(new IgniteClientDisconnectedCheckedException(null,
"Failed to ping node, client node disconnected."));
@@ -282,7 +283,7 @@ class ClientImpl extends TcpDiscoveryImpl {
timer.schedule(new TimerTask() {
@Override public void run() {
if (pingFuts.remove(nodeId, finalFut)) {
- if (ClientImpl.this.state == State.DISCONNECTED)
+ if (ClientImpl.this.state == DISCONNECTED)
finalFut.onDone(new IgniteClientDisconnectedCheckedException(null,
"Failed to ping node, client node disconnected."));
else
@@ -345,10 +346,10 @@ class ClientImpl extends TcpDiscoveryImpl {
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
State state = this.state;
- if (state == State.SEGMENTED)
+ if (state == SEGMENTED)
throw new IgniteException("Failed to send custom message: client is segmented.");
- if (state == State.DISCONNECTED)
+ if (state == DISCONNECTED)
throw new IgniteException("Failed to send custom message: client is disconnected.");
try {
@@ -981,10 +982,6 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- assert state == ClientImpl.State.DISCONNECTED
- || state == ClientImpl.State.CONNECTED
- || state == ClientImpl.State.STARTING : state;
-
boolean success = false;
Exception err = null;
@@ -1135,7 +1132,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@SuppressWarnings("InfiniteLoopStatement")
@Override protected void body() throws InterruptedException {
- state = ClientImpl.State.STARTING;
+ state = STARTING;
spi.stats.onJoinStarted();
@@ -1146,23 +1143,23 @@ class ClientImpl extends TcpDiscoveryImpl {
Object msg = queue.take();
if (msg == JOIN_TIMEOUT) {
- if (state == ClientImpl.State.STARTING) {
+ if (state == STARTING) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
"[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
break;
}
- else if (state == ClientImpl.State.DISCONNECTED) {
+ else if (state == DISCONNECTED) {
log.info("Rejoin timeout, will segment.");
- state = ClientImpl.State.SEGMENTED;
+ state = SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
}
else if (msg == SPI_STOP) {
- state = ClientImpl.State.STOPPED;
+ state = STOPPED;
assert spi.getSpiContext().isStopping();
@@ -1182,7 +1179,7 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean join = joinLatch.getCount() > 0;
- if (spi.getSpiContext().isStopping() || (state == ClientImpl.State.SEGMENTED)) {
+ if (spi.getSpiContext().isStopping() || state == SEGMENTED) {
leaveLatch.countDown();
if (join) {
@@ -1209,10 +1206,10 @@ class ClientImpl extends TcpDiscoveryImpl {
reconnector = null;
if (spi.isClientReconnectDisabled()) {
- if (state != ClientImpl.State.SEGMENTED && state != ClientImpl.State.STOPPED) {
+ if (state != SEGMENTED && state != STOPPED) {
log.info("Reconnected failed, will segment.");
- state = ClientImpl.State.SEGMENTED;
+ state = SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
@@ -1220,8 +1217,8 @@ class ClientImpl extends TcpDiscoveryImpl {
else {
log.info("Reconnected failed, will try join.");
- if (state == ClientImpl.State.STARTING || state == ClientImpl.State.CONNECTED) {
- state = ClientImpl.State.DISCONNECTED;
+ if (state == STARTING || state == CONNECTED) {
+ state = DISCONNECTED;
nodeAdded = false;
@@ -1264,8 +1261,8 @@ class ClientImpl extends TcpDiscoveryImpl {
err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
if (err != null) {
- if (state == ClientImpl.State.DISCONNECTED) {
- state = ClientImpl.State.SEGMENTED;
+ if (state == DISCONNECTED) {
+ state = SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
@@ -1298,9 +1295,9 @@ class ClientImpl extends TcpDiscoveryImpl {
* @throws InterruptedException If interrupted.
*/
private void tryJoin() throws InterruptedException {
- assert state == ClientImpl.State.DISCONNECTED || state == ClientImpl.State.STARTING : state;
+ assert state == DISCONNECTED || state == STARTING : state;
- boolean join = state == ClientImpl.State.STARTING;
+ boolean join = state == STARTING;
log.info("Try join topology with timeout: " + spi.joinTimeout);
@@ -1314,7 +1311,7 @@ class ClientImpl extends TcpDiscoveryImpl {
else {
log.info("Send join request on rejoin failed, will segment.");
- state = ClientImpl.State.SEGMENTED;
+ state = SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
@@ -1377,14 +1374,14 @@ class ClientImpl extends TcpDiscoveryImpl {
private boolean joining() {
ClientImpl.State state = ClientImpl.this.state;
- return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED;
+ return state == STARTING || state == DISCONNECTED;
}
/**
* @return {@code True} if client disconnected.
*/
private boolean disconnected() {
- return state == ClientImpl.State.DISCONNECTED;
+ return state == DISCONNECTED;
}
/**
@@ -1477,14 +1474,14 @@ class ClientImpl extends TcpDiscoveryImpl {
if (disconnected())
notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes);
+ else
+ spi.stats.onJoinFinished();
- state = ClientImpl.State.CONNECTED;
+ state = CONNECTED;
joinErr.set(null);;
joinLatch.countDown();
-
- spi.stats.onJoinFinished();
}
else if (log.isDebugEnabled())
log.debug("Discarding node add finished message (this message has already been processed) " +
@@ -1737,7 +1734,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
- if (msg.verified() && state == ClientImpl.State.CONNECTED) {
+ if (msg.verified() && state == CONNECTED) {
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
@@ -1877,7 +1874,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
*
*/
- private enum State {
+ enum State {
/** */
STARTING,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
index b8f9ce1..62f5d41 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
@@ -98,13 +98,9 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest {
@Override public boolean unregister(String rsrcName) { return false; }
/** {@inheritDoc} */
- @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
- // No-op.
- }
+ @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { /* No-op. */ }
/** {@inheritDoc} */
- @Override public void onClientReconnected(boolean clusterRestarted) {
- // No-op.
- }
+ @Override public void onClientReconnected(boolean clusterRestarted) { /* No-op. */ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 5838481..4d19f3e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -391,11 +391,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(1);
((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
- @Override
- public void apply(Socket sock) {
+ @Override public void apply(Socket sock) {
try {
latch.await();
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -753,11 +753,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
attachListeners(1, 1);
((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
- @Override
- public void apply(TcpDiscoveryAbstractMessage msg) {
+ @Override public void apply(TcpDiscoveryAbstractMessage msg) {
try {
Thread.sleep(1000000);
- } catch (InterruptedException ignored) {
+ }
+ catch (InterruptedException ignored) {
Thread.interrupted();
}
}
@@ -787,8 +787,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
G.ignite("client-0").compute().broadcast(F.noop());
assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return checkMetrics(3, 3, 1);
}
}, 10000));
@@ -798,8 +797,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
G.ignite("server-0").compute().broadcast(F.noop());
assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return checkMetrics(3, 3, 2);
}
}, 10000));
@@ -1204,8 +1202,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientsPerSrv = CLIENTS;
GridTestUtils.runMultiThreaded(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ @Override public Void call() throws Exception {
Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
clientNodeIds.add(g.cluster().localNode().id());
@@ -1297,7 +1294,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
if (changeTop)
clientSpi.pauseAll();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
assertEquals(0, disconnectLatch.getCount());
reconnectLatch.countDown();
@@ -1406,7 +1404,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
startClientNodes(1);
- Ignite srv = G.ignite("server-0");
+ final Ignite srv = G.ignite("server-0");
Ignite client = G.ignite("client-0");
TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
@@ -1461,7 +1459,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
log.info("Fail client connection.");
srvSpi.failClientReconnect.set(1_000_000);
- srvSpi.failNodeAdded.set(1_000_000);
+ srvSpi.skipNodeAdded = true;
clientSpi.brakeConnection();
}
@@ -1474,8 +1472,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertFalse(err.get());
- if (!failSrv)
+ if (!failSrv) {
await(srvFailedLatch);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srv.cluster().nodes().size() == 1;
+ }
+ }, 10_000);
+
+ checkNodes(1, 0);
+ }
}
/**
@@ -1485,10 +1492,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
netTimeout = 3000;
joinTimeout = 60_000;
- clientIpFinder = new TcpDiscoveryVmIpFinder();
-
- clientIpFinder.setAddresses(Collections.singleton("localhost:47500..47509"));
-
final CountDownLatch disconnectLatch = new CountDownLatch(1);
final CountDownLatch reconnectLatch = new CountDownLatch(1);
final AtomicBoolean err = new AtomicBoolean(false);
@@ -1501,8 +1504,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
Ignite client = G.ignite("client-0");
client.events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event evt) {
+ @Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
log.info("Disconnected event.");
@@ -1510,7 +1512,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertEquals(1, disconnectLatch.getCount());
disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
log.info("Reconnected event.");
assertEquals(1, reconnectLatch.getCount());
@@ -1518,7 +1521,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertFalse(err.get());
reconnectLatch.countDown();
- } else {
+ }
+ else {
log.error("Unexpected event: " + evt);
err.set(true);
@@ -1545,6 +1549,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+ clientNodeIds.clear();
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ checkNodes(1, 1);
+
assertFalse(err.get());
}
@@ -1552,6 +1561,87 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testDisconnectAfterNetworkTimeout() throws Exception {
+ netTimeout = 5000;
+ joinTimeout = 60_000;
+ maxMissedClientHbs = 2;
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ final Ignite srv = G.ignite("server-0");
+ Ignite client = G.ignite("client-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+ TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ final AtomicBoolean err = new AtomicBoolean(false);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(1, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ log.info("Reconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(0, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ reconnectLatch.countDown();
+ }
+ else {
+ log.error("Unexpected event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+ log.info("Fail client connection1.");
+
+ srvSpi.failClientReconnect.set(1_000_000);
+ srvSpi.skipNodeAdded = true;
+
+ clientSpi.brakeConnection();
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+ log.info("Fail client connection2.");
+
+ srvSpi.failClientReconnect.set(0);
+ srvSpi.skipNodeAdded = false;
+
+ clientSpi.brakeConnection();
+
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ clientNodeIds.clear();
+
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ return srv.cluster().nodes().size() == 2;
+ }
+ }, 10_000);
+
+ checkNodes(1, 1);
+
+ assertFalse(err.get());
}
/**
@@ -1834,6 +1924,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private volatile String delayJoinAckFor;
+ /** */
+ private volatile boolean skipNodeAdded;
+
/**
* @param lock Lock.
*/
@@ -1906,6 +1999,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
boolean fail = false;
+ if (skipNodeAdded &&
+ (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage)) {
+ log.info("Skip message: " + msg);
+
+ return;
+ }
+
if (msg instanceof TcpDiscoveryNodeAddedMessage)
fail = failNodeAdded.getAndDecrement() > 0;
else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
[4/8] incubator-ignite git commit: #ignite-gg-10526: fix consistentId.
Posted by sb...@apache.org.
#ignite-gg-10526: fix consistentId.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/63867945
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/63867945
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/63867945
Branch: refs/heads/ignite-901
Commit: 638679451df99499b5699534b39da08b665cf5ba
Parents: e3fba88
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jul 10 10:42:17 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jul 10 10:48:38 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/internal/util/IgniteUtils.java | 6 +-----
.../ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java | 8 ++++++--
2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63867945/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 46a23d6..f457d6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8048,13 +8048,9 @@ public abstract class IgniteUtils {
public static String consistentId(Collection<String> addrs, int port) {
assert !F.isEmpty(addrs);
- List<String> sortedAddrs = new ArrayList<>(addrs);
-
- Collections.sort(sortedAddrs);
-
StringBuilder sb = new StringBuilder();
- for (String addr : sortedAddrs)
+ for (String addr : addrs)
sb.append(addr).append(',');
sb.delete(sb.length() - 1, sb.length());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63867945/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 4b4df45..22f56c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -143,13 +143,17 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
assert ver != null;
this.id = id;
- this.addrs = addrs;
+
+ List<String> sortedAddrs = new ArrayList<>(addrs);
+ Collections.sort(sortedAddrs);
+
+ this.addrs = sortedAddrs;
this.hostNames = hostNames;
this.discPort = discPort;
this.metricsProvider = metricsProvider;
this.ver = ver;
- consistentId = U.consistentId(addrs, discPort);
+ consistentId = U.consistentId(sortedAddrs, discPort);
metrics = metricsProvider.metrics();
cacheMetrics = metricsProvider.cacheMetrics();
[5/8] incubator-ignite git commit: # ignite-901
Posted by sb...@apache.org.
# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a6222c93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a6222c93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a6222c93
Branch: refs/heads/ignite-901
Commit: a6222c93d25099757bcfa9fc85305d6ece4d6ebd
Parents: 00d151b
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 10 12:11:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 10 12:11:46 2015 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 26 +++++++++++---------
.../processors/cache/GridCacheProcessor.java | 2 +-
.../IgniteClientReconnectCacheTest.java | 4 +--
3 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6222c93/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8e7fc97..2293462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -294,17 +294,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/** {@inheritDoc} */
- @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
- locJoinEvt = new GridFutureAdapter<>();
-
- discoCacheHist.clear();
-
- topHist.clear();
-
- registeredCaches.clear();
- }
-
- /** {@inheritDoc} */
@Override protected void onKernalStart0() throws IgniteCheckedException {
if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode())
ctx.performance().add("Enable client mode for TcpDiscoverySpi " +
@@ -385,6 +374,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Map<Long, Collection<ClusterNode>> snapshots,
@Nullable DiscoverySpiCustomMessage spiCustomMsg
) {
+ if (type == EVT_NODE_JOINED && node.isLocal() && ctx.clientDisconnected()) {
+ discoCacheHist.clear();
+
+ topHist.clear();
+
+ topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, null));
+ }
+
DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
: ((CustomMessageWrapper)spiCustomMsg).delegate();
@@ -466,7 +463,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// If this is a local join event, just save it and do not notify listeners.
if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) {
- gridStartTime = spi.getGridStartTime();
+ if (gridStartTime == 0)
+ gridStartTime = spi.getGridStartTime();
DiscoveryEvent discoEvt = new DiscoveryEvent();
@@ -495,6 +493,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
assert node.isClient() : node;
((IgniteKernal)ctx.grid()).onDisconnected();
+
+ locJoinEvt = new GridFutureAdapter<>();
+
+ registeredCaches.clear();
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6222c93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f58ef6d..767b62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -951,7 +951,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
- List<GridCacheAdapter> reconnected = new ArrayList<>();
+ List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
for (GridCacheAdapter cache : caches.values()) {
String name = cache.name();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6222c93/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 36ea63f..aae7162 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -966,9 +966,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
/**
*
*/
- static class TestClass1 implements Serializable {
- int val;
- }
+ static class TestClass1 implements Serializable {}
/**
*
[3/8] incubator-ignite git commit: # ignite-929 close does not
destroy cache
Posted by sb...@apache.org.
# ignite-929 close does not destroy cache
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3fba883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3fba883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3fba883
Branch: refs/heads/ignite-901
Commit: e3fba883ab69cd7f32296633558db3b7f6442ab2
Parents: 90580d8
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 10 09:20:11 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 10 09:20:11 2015 +0300
----------------------------------------------------------------------
.../examples/ScalarCacheAffinityExample.scala | 2 +-
.../scalar/examples/ScalarCacheExample.scala | 2 +-
.../ScalarCachePopularNumbersExample.scala | 2 +-
.../examples/ScalarCacheQueryExample.scala | 2 +-
.../examples/ScalarSnowflakeSchemaExample.scala | 4 +-
.../java/org/apache/ignite/IgniteCache.java | 14 +-
.../org/apache/ignite/cache/CacheManager.java | 13 +-
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../discovery/GridDiscoveryManager.java | 23 +-
.../cache/DynamicCacheChangeRequest.java | 39 +-
.../processors/cache/GridCacheGateway.java | 4 +-
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCacheProcessor.java | 102 ++-
.../processors/cache/IgniteCacheProxy.java | 448 +++++++---
.../distributed/dht/GridDhtCacheEntry.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 30 +-
.../visor/cache/VisorCacheStopTask.java | 2 +-
.../affinity/IgniteClientNodeAffinityTest.java | 14 +-
.../IgniteFairAffinityDynamicCacheSelfTest.java | 3 +-
...cheStoreSessionListenerAbstractSelfTest.java | 111 ++-
.../GridCacheTxLoadFromStoreOnLockSelfTest.java | 34 +-
.../CacheMetricsForClusterGroupSelfTest.java | 10 +-
.../cache/CacheOffheapMapEntrySelfTest.java | 7 +-
.../cache/CacheStopAndDestroySelfTest.java | 859 +++++++++++++++++++
...eUsageMultinodeDynamicStartAbstractTest.java | 2 +-
...ProjectionForCachesOnDaemonNodeSelfTest.java | 2 +-
.../cache/IgniteDynamicCacheStartSelfTest.java | 140 +--
...teCacheClientNodePartitionsExchangeTest.java | 29 +-
...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 2 +-
.../DataStreamerMultinodeCreateCacheTest.java | 14 +-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
.../CacheConfigurationP2PTestClient.java | 4 +-
32 files changed, 1593 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
index fbf66bc..40b947d 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheAffinityExample.scala
@@ -62,7 +62,7 @@ object ScalarCacheAffinityExample extends App {
visitUsingMapKeysToNodes(cache)
}
finally {
- cache.close()
+ cache.destroy()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
index 42e8ca4..0bf8d6f 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheExample.scala
@@ -50,7 +50,7 @@ object ScalarCacheExample extends App {
basicOperations()
}
finally {
- cache.close()
+ cache.destroy()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
index 828c5a3..d113297 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala
@@ -93,7 +93,7 @@ object ScalarCachePopularNumbersExample extends App {
}
}
finally {
- cache.close()
+ cache.destroy()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
index b8054eb..1a42947 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheQueryExample.scala
@@ -55,7 +55,7 @@ object ScalarCacheQueryExample {
example(ignite$)
}
finally {
- cache.close()
+ cache.destroy()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
index 2656f44..33b2fcc 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarSnowflakeSchemaExample.scala
@@ -86,11 +86,11 @@ object ScalarSnowflakeSchemaExample {
queryProductPurchases()
}
finally {
- factCache.close()
+ factCache.destroy()
}
}
finally {
- dimCache.close()
+ dimCache.destroy()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index c8d6d7a..4938ab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -543,9 +543,21 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
CacheEntryProcessor<K, V, T> entryProcessor, Object... args);
/**
+ * Closes this cache instance.
+ * <p>
+ * For local cache equivalent to {@link #destroy()}.
+ * For distributed caches, if called on clients, stops client cache, if called on a server node,
+ * just closes this cache instance and does not destroy cache data.
+ * <p>
+ * After cache instance is closed another {@link IgniteCache} instance for the same
+ * cache can be created using {@link Ignite#cache(String)} method.
+ */
+ @Override public void close();
+
+ /**
* Completely deletes the cache with all its data from the system on all cluster nodes.
*/
- @Override void close();
+ public void destroy();
/**
* This cache node to re-balance its partitions. This method is usually used when
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java
index 9ba50d1..bc6df76 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java
@@ -130,6 +130,7 @@ public class CacheManager implements javax.cache.CacheManager {
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public <K, V, C extends Configuration<K, V>> Cache<K, V> createCache(String cacheName, C cacheCfg)
throws IllegalArgumentException {
kernalGateway.readLock();
@@ -155,11 +156,11 @@ public class CacheManager implements javax.cache.CacheManager {
IgniteCache<K, V> res = ignite.createCache(igniteCacheCfg);
- ((IgniteCacheProxy<K, V>)res).setCacheManager(this);
-
if (res == null)
throw new CacheException();
+ ((IgniteCacheProxy<K, V>)res).setCacheManager(this);
+
if (igniteCacheCfg.isManagementEnabled())
enableManagement(cacheName, true);
@@ -219,6 +220,7 @@ public class CacheManager implements javax.cache.CacheManager {
/**
* @param cacheName Cache name.
+ * @return Cache.
*/
@Nullable private <K, V> IgniteCache<K, V> getCache0(String cacheName) {
if (cacheName == null)
@@ -272,11 +274,13 @@ public class CacheManager implements javax.cache.CacheManager {
}
if (cache != null)
- cache.close();
+ cache.destroy();
}
/**
* @param cacheName Cache name.
+ * @param objName Object name.
+ * @return Object name instance.
*/
private ObjectName getObjectName(String cacheName, String objName) {
String mBeanName = "javax.cache:type=" + objName + ",CacheManager="
@@ -339,7 +343,8 @@ public class CacheManager implements javax.cache.CacheManager {
/**
* @param mxbean MXBean.
- * @param name cache name.
+ * @param name Cache name.
+ * @param beanType Bean type.
*/
private void registerCacheObject(Object mxbean, String name, String beanType) {
MBeanServer mBeanSrv = ignite.configuration().getMBeanServer();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d6ddf79..024dc7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2436,7 +2436,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
IgniteInternalFuture<?> stopFut;
try {
- stopFut = ctx.cache().dynamicStopCache(cacheName);
+ stopFut = ctx.cache().dynamicDestroyCache(cacheName);
}
finally {
unguard();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a8ce8ff..eae07ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -263,6 +263,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Removes near node ID from cache filter.
+ *
+ * @param cacheName Cache name.
+ * @param clientNodeId Near node ID.
+ */
+ public void onClientCacheClose(String cacheName, UUID clientNodeId) {
+ CachePredicate predicate = registeredCaches.get(cacheName);
+
+ if (predicate != null)
+ predicate.onNodeLeft(clientNodeId);
+ }
+
+ /**
* @return Client nodes map.
*/
public Map<String, Map<UUID, Boolean>> clientNodesMap() {
@@ -1079,9 +1092,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if node for given ID is alive.
*/
public boolean alive(UUID nodeId) {
+ return getAlive(nodeId) != null;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return Node if node is alive.
+ */
+ @Nullable public ClusterNode getAlive(UUID nodeId) {
assert nodeId != null;
- return getSpi().getNode(nodeId) != null; // Go directly to SPI without checking disco cache.
+ return getSpi().getNode(nodeId); // Go directly to SPI without checking disco cache.
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index c08a179..7af1572 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -57,6 +57,9 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Stop flag. */
private boolean stop;
+ /** Close flag. */
+ private boolean close;
+
/** Fail if exists flag. */
private boolean failIfExists;
@@ -68,23 +71,10 @@ public class DynamicCacheChangeRequest implements Serializable {
*
* @param cacheName Cache stop name.
* @param initiatingNodeId Initiating node ID.
- * @param stop Stop flag.
*/
- public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId, boolean stop) {
+ public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) {
this.cacheName = cacheName;
this.initiatingNodeId = initiatingNodeId;
-
- this.stop = stop;
- }
-
- /**
- * Constructor means for start requests.
- *
- * @param cacheName Cache name.
- * @param initiatingNodeId Initiating node ID.
- */
- public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) {
- this(cacheName, initiatingNodeId, false);
}
/**
@@ -130,6 +120,13 @@ public class DynamicCacheChangeRequest implements Serializable {
}
/**
+ * @param stop New stop flag.
+ */
+ public void stop(boolean stop) {
+ this.stop = stop;
+ }
+
+ /**
* @return Cache name.
*/
public String cacheName() {
@@ -220,6 +217,20 @@ public class DynamicCacheChangeRequest implements Serializable {
this.failIfExists = failIfExists;
}
+ /**
+ * @return Close flag.
+ */
+ public boolean close() {
+ return close;
+ }
+
+ /**
+ * @param close New close flag.
+ */
+ public void close(boolean close) {
+ this.close = close;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index d9d151c..f2beb0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -68,7 +68,7 @@ public class GridCacheGateway<K, V> {
*
* @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
*/
- public boolean enterIfNotClosed() {
+ public boolean enterIfNotStopped() {
onEnter();
// Must unlock in case of unexpected errors to avoid
@@ -89,7 +89,7 @@ public class GridCacheGateway<K, V> {
*
* @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
*/
- public boolean enterIfNotClosedNoLock() {
+ public boolean enterIfNotStoppedNoLock() {
onEnter();
return !stopped;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index af87685..4398b4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -156,16 +156,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// Validate requests to check if event should trigger partition exchange.
for (DynamicCacheChangeRequest req : batch.requests()) {
- if (cctx.cache().dynamicCacheRegistered(req))
+ if (cctx.cache().exchangeNeeded(req))
valid.add(req);
else
cctx.cache().completeStartFuture(req);
}
if (!F.isEmpty(valid)) {
- exchId = exchangeId(n.id(),
- affinityTopologyVersion(e),
- e.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
exchFut = exchangeFuture(exchId, e, valid);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index de1eac2..bb87a86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1390,10 +1390,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return {@code True} if change request was registered to apply.
*/
@SuppressWarnings("IfMayBeConditional")
- public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) {
+ public boolean exchangeNeeded(DynamicCacheChangeRequest req) {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
if (desc != null) {
+ if (req.close()) {
+ assert req.initiatingNodeId() != null : req;
+
+ return true;
+ }
+
if (desc.deploymentId().equals(req.deploymentId())) {
if (req.start())
return !desc.cancelled();
@@ -1515,20 +1521,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Stop request.
*/
public void blockGateway(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() || req.close();
- // Break the proxy before exchange future is done.
- IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
+ if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
+ // Break the proxy before exchange future is done.
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
- if (proxy != null)
- proxy.gate().block();
+ if (proxy != null) {
+ if (req.stop())
+ proxy.gate().block();
+ else
+ proxy.closeProxy();
+ }
+ }
}
/**
* @param req Request.
*/
private void stopGateway(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() : req;
// Break the proxy before exchange future is done.
IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName()));
@@ -1541,7 +1553,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Stop request.
*/
public void prepareCacheStop(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() || req.close() : req;
GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName()));
@@ -1597,6 +1609,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
registeredCaches.remove(masked, desc);
}
+ else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
+
+ if (proxy != null) {
+ if (proxy.context().affinityNode()) {
+ GridCacheAdapter<?, ?> cache = caches.get(masked);
+
+ if (cache != null)
+ jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
+ }
+ else {
+ proxy.context().gate().onStopped();
+
+ prepareCacheStop(req);
+ }
+ }
+ }
completeStartFuture(req);
}
@@ -2005,13 +2034,35 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param cacheName Cache name to stop.
- * @return Future that will be completed when cache is stopped.
+ * @param cacheName Cache name to destroy.
+ * @return Future that will be completed when cache is destroyed.
*/
- public IgniteInternalFuture<?> dynamicStopCache(String cacheName) {
+ public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) {
checkEmptyTransactions();
- DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true);
+ DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
+
+ t.stop(true);
+
+ return F.first(initiateCacheChanges(F.asList(t), false));
+ }
+
+
+ /**
+ * @param cacheName Cache name to close.
+ * @return Future that will be completed when cache is closed.
+ */
+ public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName));
+
+ if (proxy == null || proxy.proxyClosed())
+ return new GridFinishedFuture<>(); // No-op.
+
+ checkEmptyTransactions();
+
+ DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
+
+ t.close(true);
return F.first(initiateCacheChanges(F.asList(t), false));
}
@@ -2031,16 +2082,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req);
try {
- if (req.stop()) {
+ if (req.stop() || req.close()) {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
if (desc == null)
// No-op.
fut.onDone();
else {
+ assert desc.cacheConfiguration() != null : desc;
+
+ if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) {
+ req.close(false);
+
+ req.stop(true);
+ }
+
IgniteUuid dynamicDeploymentId = desc.deploymentId();
- assert dynamicDeploymentId != null;
+ assert dynamicDeploymentId != null : desc;
// Save deployment ID to avoid concurrent stops.
req.deploymentId(dynamicDeploymentId);
@@ -2188,9 +2247,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.nearCacheConfiguration() != null);
}
else {
+ assert req.stop() || req.close() : req;
+
if (desc == null) {
- // If local node initiated start, fail the start future.
- DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
+ // If local node initiated start, finish future.
+ DynamicCacheStartFuture changeFut =
+ (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) {
// No-op.
@@ -2200,9 +2262,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return;
}
- desc.onCancelled();
+ if (req.stop()) {
+ desc.onCancelled();
- ctx.discovery().removeCacheFilter(req.cacheName());
+ ctx.discovery().removeCacheFilter(req.cacheName());
+ }
+ else
+ ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b31b2e8..9767f49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -18,8 +18,8 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
-import org.apache.ignite.cache.CacheManager;
import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.CacheManager;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
@@ -171,19 +171,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public CacheMetrics metrics() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().metrics();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public CacheMetrics metrics(ClusterGroup grp) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
@@ -202,19 +206,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public CacheMetricsMXBean mxBean() {
- CacheOperationContext prev = gate.enter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().mxBean();
}
finally {
- gate.leave(prev);
+ onLeave(gate, prev);
}
}
@@ -230,19 +236,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Nullable @Override public Cache.Entry<K, V> randomEntry() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().randomEntry();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) :
@@ -251,7 +261,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -262,7 +272,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withNoRetries() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
boolean noRetries = opCtx != null && opCtx.noRetries();
@@ -280,14 +292,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -296,7 +310,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
ctx.cache().globalLoadCache(p, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -307,7 +321,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -316,7 +332,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.localLoadCache(p, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -327,7 +343,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -339,7 +357,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndPutIfAbsent(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -359,13 +377,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean isLocalLocked(K key, boolean byCurrThread) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -379,7 +399,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
final CacheQuery<Map.Entry<K,V>> qry;
final CacheQueryFuture<Map.Entry<K,V>> fut;
- boolean isKeepPortable = opCtx != null ? opCtx.isKeepPortable() : false;
+ boolean isKeepPortable = opCtx != null && opCtx.isKeepPortable();
if (filter instanceof ScanQuery) {
IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
@@ -444,11 +464,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
- * @param local Enforce local.
+ * @param loc Enforce local.
* @return Local node cluster group.
*/
- private ClusterGroup projection(boolean local) {
- if (local || ctx.isLocal() || isReplicatedDataNode())
+ private ClusterGroup projection(boolean loc) {
+ if (loc || ctx.isLocal() || isReplicatedDataNode())
return ctx.kernalContext().grid().cluster().forLocal();
if (ctx.isReplicated())
@@ -517,7 +537,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <R> QueryCursor<R> query(Query<R> qry) {
A.notNull(qry, "qry");
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -558,7 +580,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw new CacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -589,7 +611,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localEntries(peekModes);
@@ -598,37 +622,43 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public QueryMetrics queryMetrics() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.context().queries().metrics();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localEvict(Collection<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.evictAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localPeek(key, peekModes, null);
@@ -637,20 +667,22 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localPromote(Set<? extends K> keys) throws CacheException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.promoteAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -660,7 +692,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public int size(CachePeekMode... peekModes) throws CacheException {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -675,13 +709,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public int localSize(CachePeekMode... peekModes) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.localSize(peekModes);
@@ -690,14 +726,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public V get(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -709,7 +747,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.get(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -720,7 +758,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -732,7 +772,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -743,7 +783,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -755,7 +797,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAllOutTx(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -769,7 +811,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
*/
public Map<K, V> getAll(Collection<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -781,7 +825,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -796,19 +840,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Entry set.
*/
public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return delegate.entrySetx(filter);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public boolean containsKey(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -820,13 +868,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.containsKey(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public boolean containsKeys(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -838,7 +888,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.containsKeys(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -848,7 +898,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
boolean replaceExisting,
@Nullable final CompletionListener completionLsnr
) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting);
@@ -869,14 +921,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void put(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -896,7 +950,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.put(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -907,7 +961,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -919,7 +975,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndPut(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -930,7 +986,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> map) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -939,7 +997,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.putAll(map);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -950,7 +1008,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -962,7 +1022,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.putIfAbsent(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -973,7 +1033,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean remove(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -985,7 +1047,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.remove(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -996,7 +1058,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean remove(K key, V oldVal) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1008,7 +1072,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.remove(key, oldVal);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1019,7 +1083,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndRemove(K key) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1031,7 +1097,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndRemove(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1042,7 +1108,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1054,7 +1122,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.replace(key, oldVal, newVal);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1065,7 +1133,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1077,7 +1147,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.replace(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1088,7 +1158,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1100,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.getAndReplace(key, val);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1111,7 +1183,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void removeAll(Set<? extends K> keys) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1120,7 +1194,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
delegate.removeAll(keys);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1130,7 +1204,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void removeAll() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1142,13 +1218,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clear(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1160,13 +1238,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clearAll(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1178,13 +1258,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void clear() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync())
@@ -1196,32 +1278,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localClear(K key) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
delegate.clearLocally(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void localClearAll(Set<? extends K> keys) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
for (K key : keys)
delegate.clearLocally(key);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1229,7 +1315,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
throws EntryProcessorException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1255,7 +1343,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1267,7 +1355,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args)
throws EntryProcessorException {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1293,7 +1383,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1303,10 +1393,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
- EntryProcessor<K, V, T> entryProcessor,
- Object... args) {
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1318,7 +1410,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(keys, entryProcessor, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1331,7 +1423,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
CacheEntryProcessor<K, V, T> entryProcessor,
Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1343,7 +1437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(keys, entryProcessor, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1356,7 +1450,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) {
try {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
if (isAsync()) {
@@ -1368,7 +1464,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return delegate.invokeAll(map, args);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
catch (IgniteCheckedException e) {
@@ -1394,17 +1490,43 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public void destroy() {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
+ return;
+
+ IgniteInternalFuture<?> fut;
+
+ try {
+ fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name());
+ }
+ finally {
+ onLeave(gate);
+ }
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void close() {
- if (!onEnterIfNoClose())
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
return;
IgniteInternalFuture<?> fut;
try {
- fut = ctx.kernalContext().cache().dynamicStopCache(ctx.name());
+ fut = ctx.kernalContext().cache().dynamicCloseCache(ctx.name());
}
finally {
- onLeave();
+ onLeave(gate);
}
try {
@@ -1417,14 +1539,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public boolean isClosed() {
- if (!onEnterIfNoClose())
+ GridCacheGateway<K, V> gate = this.gate;
+
+ if (!onEnterIfNoStop(gate))
return true;
try {
return ctx.kernalContext().cache().context().closed(ctx);
}
finally {
- onLeave();
+ onLeave(gate);
}
}
@@ -1448,7 +1572,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
@@ -1457,13 +1583,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
@@ -1472,19 +1600,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
throw cacheException(e);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<K, V>> iterator() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
return ctx.cache().igniteIterator();
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1516,8 +1646,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
*
* @return Projection for portable objects.
*/
+ @SuppressWarnings("unchecked")
public <K1, V1> IgniteCache<K1, V1> keepPortable() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
CacheOperationContext opCtx0 =
@@ -1535,7 +1668,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1543,7 +1676,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Cache with skip store enabled.
*/
public IgniteCache<K, V> skipStore() {
- CacheOperationContext prev = onEnter(opCtx);
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
try {
boolean skip = opCtx != null && opCtx.skipStore();
@@ -1565,7 +1700,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
lock);
}
finally {
- onLeave(prev);
+ onLeave(gate, prev);
}
}
@@ -1592,10 +1727,69 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
+ * @return {@code True} if proxy was closed.
+ */
+ public boolean proxyClosed() {
+ return !gate.getClass().equals(GridCacheGateway.class);
+ }
+
+ /**
+ * Closes this proxy instance.
+ */
+ public void closeProxy() {
+ gate = new GridCacheGateway<K, V>(ctx) {
+ @Override public void enter() {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Override public boolean enterIfNotStopped() {
+ return false;
+ }
+
+ @Override public boolean enterIfNotStoppedNoLock() {
+ return false;
+ }
+
+ @Override public void leaveNoLock() {
+ assert false;
+ }
+
+ @Override public void leave() {
+ assert false;
+ }
+
+ @Nullable @Override public CacheOperationContext enter(@Nullable CacheOperationContext opCtx) {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Nullable @Override public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) {
+ throw new IllegalStateException("Cache has been closed: " + ctx.name());
+ }
+
+ @Override public void leave(CacheOperationContext prev) {
+ assert false;
+ }
+
+ @Override public void leaveNoLock(CacheOperationContext prev) {
+ assert false;
+ }
+
+ @Override public void block() {
+ // No-op.
+ }
+
+ @Override public void onStopped() {
+ // No-op.
+ }
+ };
+ }
+
+ /**
+ * @param gate Cache gateway.
* @param opCtx Cache operation context to guard.
* @return Previous projection set on this thread.
*/
- private CacheOperationContext onEnter(CacheOperationContext opCtx) {
+ private CacheOperationContext onEnter(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) {
if (lock)
return gate.enter(opCtx);
else
@@ -1603,21 +1797,21 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
- * On enter.
- *
+ * @param gate Cache gateway.
* @return {@code True} if enter successful.
*/
- private boolean onEnterIfNoClose() {
+ private boolean onEnterIfNoStop(GridCacheGateway<K, V> gate) {
if (lock)
- return gate.enterIfNotClosed();
+ return gate.enterIfNotStopped();
else
- return gate.enterIfNotClosedNoLock();
+ return gate.enterIfNotStoppedNoLock();
}
/**
+ * @param gate Cache gateway.
* @param opCtx Operation context to guard.
*/
- private void onLeave(CacheOperationContext opCtx) {
+ private void onLeave(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) {
if (lock)
gate.leave(opCtx);
else
@@ -1625,9 +1819,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
- * On leave.
+ * @param gate Cache gateway.
*/
- private void onLeave() {
+ private void onLeave(GridCacheGateway<K, V> gate) {
if (lock)
gate.leave();
else
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 89b85c4..3b411b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -597,7 +597,9 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
List<ReaderId> newRdrs = null;
for (int i = 0; i < rdrs.length; i++) {
- if (!cctx.discovery().alive(rdrs[i].nodeId())) {
+ ClusterNode node = cctx.discovery().getAlive(rdrs[i].nodeId());
+
+ if (node == null || !cctx.discovery().cacheNode(node, cacheName())) {
// Node has left and if new list has already been created, just skip.
// Otherwise, create new list and add alive nodes.
if (newRdrs == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 38a0d55..5701749 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -474,6 +474,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
oldestNode.set(oldest);
+ if (!F.isEmpty(reqs))
+ blockGateways();
+
startCaches();
// True if client node joined or failed.
@@ -489,24 +492,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
else {
assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
- boolean clientOnlyStart = true;
+ boolean clientOnlyCacheEvt = true;
for (DynamicCacheChangeRequest req : reqs) {
- if (!req.clientStartOnly()) {
- clientOnlyStart = false;
+ if (req.clientStartOnly() || req.close())
+ continue;
- break;
- }
+ clientOnlyCacheEvt = false;
+
+ break;
}
- clientNodeEvt = clientOnlyStart;
+ clientNodeEvt = clientOnlyCacheEvt;
}
if (clientNodeEvt) {
ClusterNode node = discoEvt.eventNode();
// Client need to initialize affinity for local join event or for stated client caches.
- if (!node.isLocal()) {
+ if (!node.isLocal() || clientCacheClose()) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -733,9 +737,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (log.isDebugEnabled())
log.debug("After waiting for partition release future: " + this);
- if (!F.isEmpty(reqs))
- blockGateways();
-
if (exchId.isLeft())
cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
@@ -839,6 +840,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @return {@code True} if exchange initiated for client cache close.
+ */
+ private boolean clientCacheClose() {
+ return reqs != null && reqs.size() == 1 && reqs.iterator().next().close();
+ }
+
+ /**
*
*/
private void dumpPendingObjects() {
@@ -903,7 +911,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*/
private void blockGateways() {
for (DynamicCacheChangeRequest req : reqs) {
- if (req.stop())
+ if (req.stop() || req.close())
cctx.cache().blockGateway(req);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java
index 0e848f9..83d19f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStopTask.java
@@ -56,7 +56,7 @@ public class VisorCacheStopTask extends VisorOneNodeTask<String, Void> {
@Override protected Void run(String cacheName) {
IgniteCache cache = ignite.cache(cacheName);
- cache.close();
+ cache.destroy();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
index 467349f..da27fb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
@@ -127,13 +127,23 @@ public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest {
ccfg.setNodeFilter(new TestNodesFilter());
- try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg)) {
+ IgniteCache<Integer, Integer> cache = client.createCache(ccfg);
+
+ try {
checkCache(null, 1);
}
+ finally {
+ cache.destroy();
+ }
- try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg, new NearCacheConfiguration())) {
+ cache = client.createCache(ccfg, new NearCacheConfiguration());
+
+ try {
checkCache(null, 1);
}
+ finally {
+ cache.destroy();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
index 18b77e0..e51be58 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
@@ -84,8 +84,7 @@ public class IgniteFairAffinityDynamicCacheSelfTest extends GridCommonAbstractTe
cache.put(i, i);
IgniteInternalFuture<Object> destFut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
+ @Override public Object call() throws Exception {
ignite(0).destroyCache(cache.getName());
return null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
index 0634197..8e53f05 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerAbstractSelfTest.java
@@ -113,12 +113,17 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
public void testAtomicCache() throws Exception {
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.ATOMIC);
- try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
+ IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
+
+ try {
cache.loadCache(null);
cache.get(1);
cache.put(1, 1);
cache.remove(1);
}
+ finally {
+ cache.destroy();
+ }
assertEquals(3, loadCacheCnt.get());
assertEquals(1, loadCnt.get());
@@ -133,12 +138,17 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
public void testTransactionalCache() throws Exception {
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL);
- try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
+ IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
+
+ try {
cache.loadCache(null);
cache.get(1);
cache.put(1, 1);
cache.remove(1);
}
+ finally {
+ cache.destroy();
+ }
assertEquals(3, loadCacheCnt.get());
assertEquals(1, loadCnt.get());
@@ -153,15 +163,18 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
public void testExplicitTransaction() throws Exception {
CacheConfiguration<Integer, Integer> cfg = cacheConfiguration(null, CacheAtomicityMode.TRANSACTIONAL);
- try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg)) {
- try (Transaction tx = ignite(0).transactions().txStart()) {
- cache.put(1, 1);
- cache.put(2, 2);
- cache.remove(3);
- cache.remove(4);
+ IgniteCache<Integer, Integer> cache = ignite(0).createCache(cfg);
- tx.commit();
- }
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache.put(1, 1);
+ cache.put(2, 2);
+ cache.remove(3);
+ cache.remove(4);
+
+ tx.commit();
+ }
+ finally {
+ cache.destroy();
}
assertEquals(2, writeCnt.get());
@@ -176,18 +189,20 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
- try (
- IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
- IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
- ) {
- try (Transaction tx = ignite(0).transactions().txStart()) {
- cache1.put(1, 1);
- cache2.put(2, 2);
- cache1.remove(3);
- cache2.remove(4);
-
- tx.commit();
- }
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2);
+
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+ cache1.remove(3);
+ cache2.remove(4);
+
+ tx.commit();
+ }
+ finally {
+ cache1.destroy();
+ cache2.destroy();
}
assertEquals(2, writeCnt.get());
@@ -204,16 +219,18 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
- try (
- IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
- IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
- ) {
- try (Transaction tx = ignite(0).transactions().txStart()) {
- cache1.put(1, 1);
- cache2.put(2, 2);
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2);
- tx.commit();
- }
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
+
+ tx.commit();
+ }
+ finally {
+ cache1.destroy();
+ cache2.destroy();
}
try (Connection conn = DriverManager.getConnection(URL)) {
@@ -232,25 +249,27 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);
- try (
- IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
- IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
- ) {
- try (Transaction tx = ignite(0).transactions().txStart()) {
- cache1.put(1, 1);
- cache2.put(2, 2);
+ IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
+ IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2);
- tx.commit();
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ cache1.put(1, 1);
+ cache2.put(2, 2);
- assert false : "Exception was not thrown.";
- }
- catch (IgniteException e) {
- CacheWriterException we = X.cause(e, CacheWriterException.class);
+ tx.commit();
- assertNotNull(we);
+ assert false : "Exception was not thrown.";
+ }
+ catch (IgniteException e) {
+ CacheWriterException we = X.cause(e, CacheWriterException.class);
+
+ assertNotNull(we);
- assertEquals("Expected failure.", we.getMessage());
- }
+ assertEquals("Expected failure.", we.getMessage());
+ }
+ finally {
+ cache1.destroy();
+ cache2.destroy();
}
try (Connection conn = DriverManager.getConnection(URL)) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
index 7b01f0f..bc6b443 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
@@ -92,31 +92,33 @@ public class GridCacheTxLoadFromStoreOnLockSelfTest extends GridCommonAbstractTe
cacheCfg.setBackups(backups);
cacheCfg.setLoadPreviousValue(true);
- try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg)) {
- for (int i = 0; i < 10; i++)
- assertEquals((Integer)i, cache.get(i));
+ IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg);
- cache.removeAll();
+ for (int i = 0; i < 10; i++)
+ assertEquals((Integer)i, cache.get(i));
- assertEquals(0, cache.size());
+ cache.removeAll();
- for (TransactionConcurrency conc : TransactionConcurrency.values()) {
- for (TransactionIsolation iso : TransactionIsolation.values()) {
- info("Checking transaction [conc=" + conc + ", iso=" + iso + ']');
+ assertEquals(0, cache.size());
- try (Transaction tx = ignite(0).transactions().txStart(conc, iso)) {
- for (int i = 0; i < 10; i++)
- assertEquals("Invalid value for transaction [conc=" + conc + ", iso=" + iso + ']',
- (Integer)i, cache.get(i));
+ for (TransactionConcurrency conc : TransactionConcurrency.values()) {
+ for (TransactionIsolation iso : TransactionIsolation.values()) {
+ info("Checking transaction [conc=" + conc + ", iso=" + iso + ']');
- tx.commit();
- }
+ try (Transaction tx = ignite(0).transactions().txStart(conc, iso)) {
+ for (int i = 0; i < 10; i++)
+ assertEquals("Invalid value for transaction [conc=" + conc + ", iso=" + iso + ']',
+ (Integer)i, cache.get(i));
- cache.removeAll();
- assertEquals(0, cache.size());
+ tx.commit();
}
+
+ cache.removeAll();
+ assertEquals(0, cache.size());
}
}
+
+ cache.destroy();
}
/**
[2/8] incubator-ignite git commit: # ignite-929 close does not
destroy cache
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
index 1ba24e3..5093af5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
@@ -107,7 +107,7 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
assertMetrics(cache1);
assertMetrics(cache2);
- closeCaches();
+ destroyCaches();
}
/**
@@ -135,7 +135,7 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
assertMetrics(cache1);
assertMetrics(cache2);
- closeCaches();
+ destroyCaches();
}
/**
@@ -157,9 +157,9 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
/**
* Closes caches.
*/
- private void closeCaches() {
- cache1.close();
- cache2.close();
+ private void destroyCaches() {
+ cache1.destroy();
+ cache2.destroy();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
index 8c7d33d..f4d7607 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
@@ -149,7 +149,9 @@ public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
cacheMode,
"Cache");
- try (IgniteCache jcache = grid(0).getOrCreateCache(cfg)) {
+ IgniteCache jcache = grid(0).getOrCreateCache(cfg);
+
+ try {
GridCacheAdapter<Integer, String> cache = ((IgniteKernal)grid(0)).internalCache(jcache.getName());
Integer key = primaryKey(grid(0).cache(null));
@@ -164,5 +166,8 @@ public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
assertEquals(entry.getClass(), entryCls);
}
+ finally {
+ jcache.destroy();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
new file mode 100644
index 0000000..20284a8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import javax.cache.CacheManager;
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Checks stop and destroy methods behavior.
+ */
+public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** key-value used at test. */
+ protected static String KEY_VAL = "1";
+
+ /** cache name 1. */
+ protected static String CACHE_NAME_DHT = "cache";
+
+ /** cache name 2. */
+ protected static String CACHE_NAME_CLIENT = "cache_client";
+
+ /** near cache name. */
+ protected static String CACHE_NAME_NEAR = "cache_near";
+
+ /** local cache name. */
+ protected static String CACHE_NAME_LOC = "cache_local";
+
+ /** */
+ private static volatile boolean stop;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGridsMultiThreaded(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @return Grids count to start.
+ */
+ protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ if (getTestGridName(2).equals(gridName))
+ iCfg.setClientMode(true);
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ iCfg.setCacheConfiguration();
+
+ TcpCommunicationSpi commSpi = new CountingTxRequestsToClientNodeTcpCommunicationSpi();
+
+ commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+ commSpi.setTcpNoDelay(true);
+
+ iCfg.setCommunicationSpi(commSpi);
+
+ return iCfg;
+ }
+
+ /**
+ * Helps to count messages.
+ */
+ public static class CountingTxRequestsToClientNodeTcpCommunicationSpi extends TcpCommunicationSpi {
+ /** Counter. */
+ public static AtomicInteger cnt = new AtomicInteger();
+
+ /** Node filter. */
+ public static UUID nodeFilter;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ super.sendMessage(node, msg);
+
+ if (nodeFilter != null &&
+ node.id().equals(nodeFilter) &&
+ msg instanceof GridIoMessage &&
+ ((GridIoMessage)msg).message() instanceof GridDhtTxPrepareRequest)
+ cnt.incrementAndGet();
+ }
+ }
+
+ /**
+ * @return dht config
+ */
+ private CacheConfiguration getDhtConfig() {
+ CacheConfiguration cfg = defaultCacheConfiguration();
+
+ cfg.setName(CACHE_NAME_DHT);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setNearConfiguration(null);
+
+ return cfg;
+ }
+
+ /**
+ * @return client config
+ */
+ private CacheConfiguration getClientConfig() {
+ CacheConfiguration cfg = defaultCacheConfiguration();
+
+ cfg.setName(CACHE_NAME_CLIENT);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setNearConfiguration(null);
+
+ return cfg;
+ }
+
+ /**
+ * @return near config
+ */
+ private CacheConfiguration getNearConfig() {
+ CacheConfiguration cfg = defaultCacheConfiguration();
+
+ cfg.setName(CACHE_NAME_NEAR);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setNearConfiguration(new NearCacheConfiguration());
+
+ return cfg;
+ }
+
+ /**
+ * @return local config
+ */
+ private CacheConfiguration getLocalConfig() {
+ CacheConfiguration cfg = defaultCacheConfiguration();
+
+ cfg.setName(CACHE_NAME_LOC);
+ cfg.setCacheMode(LOCAL);
+ cfg.setNearConfiguration(null);
+
+ return cfg;
+ }
+
+ /**
+ * Test Double Destroy.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDhtDoubleDestroy() throws Exception {
+ dhtDestroy();
+
+ dhtDestroy();
+ }
+
+ /**
+ * Test DHT Destroy.
+ *
+ * @throws Exception If failed.
+ */
+ private void dhtDestroy() throws Exception {
+ grid(0).getOrCreateCache(getDhtConfig());
+
+ assertNull(grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL));
+
+ grid(0).cache(CACHE_NAME_DHT).put(KEY_VAL, KEY_VAL);
+
+ assertEquals(KEY_VAL, grid(0).cache(CACHE_NAME_DHT).get(KEY_VAL));
+ assertEquals(KEY_VAL, grid(1).cache(CACHE_NAME_DHT).get(KEY_VAL));
+ assertEquals(KEY_VAL, grid(2).cache(CACHE_NAME_DHT).get(KEY_VAL));
+
+ assertFalse(grid(0).configuration().isClientMode());
+
+ // DHT Destroy. Cache should be removed from each node.
+
+ IgniteCache<Object, Object> cache = grid(0).cache(CACHE_NAME_DHT);
+
+ cache.destroy();
+
+ checkDestroyed(cache);
+ }
+
+ /**
+ * Test Double Destroy.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientDoubleDestroy() throws Exception {
+ clientDestroy();
+
+ clientDestroy();
+ }
+
+ /**
+ * Test Client Destroy.
+ *
+ * @throws Exception If failed.
+ */
+ private void clientDestroy() throws Exception {
+ grid(0).getOrCreateCache(getClientConfig());
+
+ assertNull(grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL));
+
+ grid(0).cache(CACHE_NAME_CLIENT).put(KEY_VAL, KEY_VAL);
+
+ assertEquals(KEY_VAL, grid(0).cache(CACHE_NAME_CLIENT).get(KEY_VAL));
+ assertEquals(KEY_VAL, grid(1).cache(CACHE_NAME_CLIENT).get(KEY_VAL));
+ assertEquals(KEY_VAL, grid(2).cache(CACHE_NAME_CLIENT).get(KEY_VAL));
+
+ // DHT Destroy from client node. Cache should be removed from each node.
+
+ assertTrue(grid(2).configuration().isClientMode());
+
+ IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME_CLIENT);
+
+ cache.destroy(); // Client node.
+
+ checkDestroyed(cache);
+ }
+
+ /**
+ * Test Double Destroy.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNearDoubleDestroy() throws Exception {
+ nearDestroy();
+
+ nearDestroy();
+ }
+
+ /**
+ * Test Near Destroy.
+ *
+ * @throws Exception If failed.
+ */
+ private void nearDestroy() throws Exception {
+ grid(0).getOrCreateCache(getNearConfig());
+
+ grid(2).getOrCreateNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration());
+
+ assertNull(grid(0).cache(CACHE_NAME_NEAR).get(KEY_VAL));
+ assertNull(grid(2).cache(CACHE_NAME_NEAR).get(KEY_VAL));
+
+ grid(2).cache(CACHE_NAME_NEAR).put(KEY_VAL, KEY_VAL);
+ grid(0).cache(CACHE_NAME_NEAR).put(KEY_VAL, "near-test");
+
+ assertEquals("near-test", grid(2).cache(CACHE_NAME_NEAR).localPeek(KEY_VAL));
+
+ // Near cache destroy. Cache should be removed from each node.
+
+ IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME_NEAR);
+
+ cache.destroy();
+
+ checkDestroyed(cache);
+ }
+
+ /**
+ * Test Double Destroy.
+ *
+ * @throws Exception If failed.
+ */
+ public void testLocalDoubleDestroy() throws Exception {
+ localDestroy();
+
+ localDestroy();
+ }
+
+ /**
+ * Test Local Destroy.
+ *
+ * @throws Exception If failed.
+ */
+ private void localDestroy() throws Exception {
+ grid(0).getOrCreateCache(getLocalConfig());
+
+ assert grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL) == null;
+ assert grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL) == null;
+
+ grid(0).cache(CACHE_NAME_LOC).put(KEY_VAL, KEY_VAL + 0);
+ grid(1).cache(CACHE_NAME_LOC).put(KEY_VAL, KEY_VAL + 1);
+
+ assert grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + 0);
+ assert grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + 1);
+
+ grid(0).cache(CACHE_NAME_LOC).destroy();
+
+ assertNull(grid(0).cache(CACHE_NAME_LOC));
+ }
+
+ /**
+ * Test Dht close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDhtClose() throws Exception {
+ IgniteCache<Integer, Integer> dhtCache0 = grid(0).getOrCreateCache(getDhtConfig());
+
+ final Integer key = primaryKey(dhtCache0);
+
+ assertNull(dhtCache0.get(key));
+
+ dhtCache0.put(key, key);
+
+ assertEquals(key, dhtCache0.get(key));
+
+ // DHT Close. No-op.
+
+ IgniteCache<Integer, Integer> dhtCache1 = grid(1).cache(CACHE_NAME_DHT);
+ IgniteCache<Integer, Integer> dhtCache2 = grid(2).cache(CACHE_NAME_DHT);
+
+ dhtCache0.close();
+
+ try {
+ dhtCache0.get(key);// Not affected, but can not be taken.
+
+ fail();
+ }
+ catch (IllegalStateException ignored) {
+ // No-op
+ }
+
+ assertEquals(key, dhtCache1.get(key)); // Not affected.
+ assertEquals(key, dhtCache2.get(key));// Not affected.
+
+ // DHT Creation after closed.
+
+ IgniteCache<Integer, Integer> dhtCache0New = grid(0).cache(CACHE_NAME_DHT);
+
+ assertNotSame(dhtCache0, dhtCache0New);
+
+ assertEquals(key, dhtCache0New.get(key)); // Not affected, can be taken since cache reopened.
+
+ dhtCache2.put(key, key + 1);
+
+ assertEquals((Object)(key + 1), dhtCache0New.get(key));
+
+ // Check close at last node.
+
+ stopAllGrids(true);
+
+ startGrid(0);
+
+ dhtCache0 = grid(0).getOrCreateCache(getDhtConfig());
+
+ assertNull(dhtCache0.get(key));
+
+ dhtCache0.put(key, key);
+
+ assertEquals(key, dhtCache0.get(key));
+
+ // Closing last node.
+ dhtCache0.close();
+
+ try {
+ dhtCache0.get(key);// Can not be taken.
+
+ fail();
+ }
+ catch (IllegalStateException ignored) {
+ // No-op
+ }
+
+ // Reopening cache.
+ dhtCache0 = grid(0).cache(CACHE_NAME_DHT);
+
+ assertEquals(key, dhtCache0.get(key)); // Entry not loosed.
+ }
+
+ /**
+ * Test Dht close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDhtCloseWithTry() throws Exception {
+ String curVal = null;
+
+ for (int i = 0; i < 3; i++) {
+ try (IgniteCache<String, String> cache0 = grid(0).getOrCreateCache(getDhtConfig())) {
+ IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_DHT);
+ IgniteCache<String, String> cache2 = grid(2).cache(CACHE_NAME_DHT);
+
+ if (i == 0) {
+ assert cache0.get(KEY_VAL) == null;
+ assert cache1.get(KEY_VAL) == null;
+ assert cache2.get(KEY_VAL) == null;
+ }
+ else {
+ assert cache0.get(KEY_VAL).equals(curVal);
+ assert cache1.get(KEY_VAL).equals(curVal);
+ assert cache2.get(KEY_VAL).equals(curVal);
+ }
+
+ curVal = KEY_VAL + curVal;
+
+ cache0.put(KEY_VAL, curVal);
+
+ assert cache0.get(KEY_VAL).equals(curVal);
+ assert cache1.get(KEY_VAL).equals(curVal);
+ assert cache2.get(KEY_VAL).equals(curVal);
+ }
+ }
+ }
+
+ /**
+ * Test Client close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientClose() throws Exception {
+ IgniteCache<String, String> cache0 = grid(0).getOrCreateCache(getClientConfig());
+
+ assert cache0.get(KEY_VAL) == null;
+
+ cache0.put(KEY_VAL, KEY_VAL);
+
+ assert cache0.get(KEY_VAL).equals(KEY_VAL);
+
+ // DHT Close from client node. Should affect only client node.
+
+ IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_CLIENT);
+ IgniteCache<String, String> cache2 = grid(2).cache(CACHE_NAME_CLIENT);
+
+ assert cache2.get(KEY_VAL).equals(KEY_VAL);
+
+ cache2.close();// Client node.
+
+ assert cache0.get(KEY_VAL).equals(KEY_VAL);// Not affected.
+ assert cache1.get(KEY_VAL).equals(KEY_VAL);// Not affected.
+
+ try {
+ cache2.get(KEY_VAL);// Affected.
+
+ assert false;
+ }
+ catch (IllegalStateException ignored) {
+ // No-op
+ }
+
+ // DHT Creation from client node after closed.
+ IgniteCache<String, String> cache2New = grid(2).cache(CACHE_NAME_CLIENT);
+
+ assertNotSame(cache2, cache2New);
+
+ assert cache2New.get(KEY_VAL).equals(KEY_VAL);
+
+ cache0.put(KEY_VAL, KEY_VAL + "recreated");
+
+ assert cache0.get(KEY_VAL).equals(KEY_VAL + "recreated");
+ assert cache1.get(KEY_VAL).equals(KEY_VAL + "recreated");
+ assert cache2New.get(KEY_VAL).equals(KEY_VAL + "recreated");
+ }
+
+ /**
+ * Test Client close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientCloseWithTry() throws Exception {
+ String curVal = null;
+
+ for (int i = 0; i < 3; i++) {
+ try (IgniteCache<String, String> cache2 = grid(2).getOrCreateCache(getClientConfig())) {
+ IgniteCache<String, String> cache0 = grid(0).cache(CACHE_NAME_CLIENT);
+ IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_CLIENT);
+
+ if (i == 0) {
+ assert cache0.get(KEY_VAL) == null;
+ assert cache1.get(KEY_VAL) == null;
+ assert cache2.get(KEY_VAL) == null;
+ }
+ else {
+ assert cache0.get(KEY_VAL).equals(curVal);
+ assert cache1.get(KEY_VAL).equals(curVal);
+ assert cache2.get(KEY_VAL).equals(curVal);
+ }
+
+ curVal = KEY_VAL + curVal;
+
+ cache2.put(KEY_VAL, curVal);
+
+ assert cache0.get(KEY_VAL).equals(curVal);
+ assert cache1.get(KEY_VAL).equals(curVal);
+ assert cache2.get(KEY_VAL).equals(curVal);
+ }
+
+ awaitPartitionMapExchange();
+ }
+ }
+
+ /**
+ * Test Near close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNearClose() throws Exception {
+ IgniteCache<String, String> cache0 = grid(0).getOrCreateCache(getNearConfig());
+
+ // GridDhtTxPrepareRequest requests to Client node will be counted.
+ CountingTxRequestsToClientNodeTcpCommunicationSpi.nodeFilter = grid(2).context().localNodeId();
+
+ // Near Close from client node.
+
+ IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_NEAR);
+ IgniteCache<String, String> cache2 = grid(2).createNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration());
+
+ assert cache2.get(KEY_VAL) == null;
+
+ // Subscribing to events.
+ cache2.put(KEY_VAL, KEY_VAL);
+
+ CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.set(0);
+
+ cache0.put(KEY_VAL, "near-test");
+
+ U.sleep(1000);
+
+ //Ensure near cache was automatically updated.
+ assert CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.get() != 0;
+
+ assert cache2.localPeek(KEY_VAL).equals("near-test");
+
+ cache2.close();
+
+ CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.set(0);
+
+ // Should not produce messages to client node.
+ cache0.put(KEY_VAL, KEY_VAL + 0);
+
+ U.sleep(1000);
+
+ // Ensure near cache was NOT automatically updated.
+ assert CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.get() == 0;
+
+ assert cache0.get(KEY_VAL).equals(KEY_VAL + 0);// Not affected.
+ assert cache1.get(KEY_VAL).equals(KEY_VAL + 0);// Not affected.
+
+ try {
+ cache2.get(KEY_VAL);// Affected.
+
+ assert false;
+ }
+ catch (IllegalArgumentException | IllegalStateException ignored) {
+ // No-op
+ }
+
+ // Near Creation from client node after closed.
+
+ IgniteCache<String, String> cache2New = grid(2).createNearCache(CACHE_NAME_NEAR, new NearCacheConfiguration());
+
+ assertNotSame(cache2, cache2New);
+
+ // Subscribing to events.
+ cache2New.put(KEY_VAL, KEY_VAL);
+
+ assert cache2New.localPeek(KEY_VAL).equals(KEY_VAL);
+
+ cache0.put(KEY_VAL, KEY_VAL + "recreated");
+
+ assert cache0.get(KEY_VAL).equals(KEY_VAL + "recreated");
+ assert cache1.get(KEY_VAL).equals(KEY_VAL + "recreated");
+ assert cache2New.localPeek(KEY_VAL).equals(KEY_VAL + "recreated");
+ }
+
+ /**
+ * Test Near close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNearCloseWithTry() throws Exception {
+ String curVal = null;
+
+ grid(0).getOrCreateCache(getNearConfig());
+
+ NearCacheConfiguration nearCfg = new NearCacheConfiguration();
+
+ for (int i = 0; i < 3; i++) {
+ try (IgniteCache<String, String> cache2 = grid(2).getOrCreateNearCache(CACHE_NAME_NEAR, nearCfg)) {
+ IgniteCache<String, String> cache0 = grid(0).cache(CACHE_NAME_NEAR);
+ IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_NEAR);
+
+ assert cache2.localPeek(KEY_VAL) == null;
+
+ assert cache0.get(KEY_VAL) == null || cache0.get(KEY_VAL).equals(curVal);
+ assert cache1.get(KEY_VAL) == null || cache1.get(KEY_VAL).equals(curVal);
+ assert cache2.get(KEY_VAL) == null || cache2.get(KEY_VAL).equals(curVal);
+
+ curVal = KEY_VAL + curVal;
+
+ cache2.put(KEY_VAL, curVal);
+
+ assert cache2.localPeek(KEY_VAL).equals(curVal);
+
+ assert cache0.get(KEY_VAL).equals(curVal);
+ assert cache1.get(KEY_VAL).equals(curVal);
+ assert cache2.get(KEY_VAL).equals(curVal);
+ }
+ }
+ }
+
+ /**
+ * Test Local close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testLocalClose() throws Exception {
+ grid(0).getOrCreateCache(getLocalConfig());
+
+ assert grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL) == null;
+ assert grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL) == null;
+
+ grid(0).cache(CACHE_NAME_LOC).put(KEY_VAL, KEY_VAL + 0);
+ grid(1).cache(CACHE_NAME_LOC).put(KEY_VAL, KEY_VAL + 1);
+
+ assert grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + 0);
+ assert grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + 1);
+
+ // Local close. Same as Local destroy.
+
+ IgniteCache<Object, Object> cache = grid(1).cache(CACHE_NAME_LOC);
+
+ cache.close();
+
+ checkUsageFails(cache);
+
+ assertNull(grid(1).cache(CACHE_NAME_LOC));
+
+ // Local creation after closed.
+
+ grid(0).getOrCreateCache(getLocalConfig());
+
+ grid(0).cache(CACHE_NAME_LOC).put(KEY_VAL, KEY_VAL + "recreated0");
+ grid(1).cache(CACHE_NAME_LOC).put(KEY_VAL, KEY_VAL + "recreated1");
+ grid(2).cache(CACHE_NAME_LOC).put(KEY_VAL, KEY_VAL + "recreated2");
+
+ assert grid(0).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + "recreated0");
+ assert grid(1).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + "recreated1");
+ assert grid(2).cache(CACHE_NAME_LOC).get(KEY_VAL).equals(KEY_VAL + "recreated2");
+ }
+
+ /**
+ * Test Local close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testLocalCloseWithTry() throws Exception {
+ String curVal = null;
+
+ for (int i = 0; i < 3; i++) {
+ try (IgniteCache<String, String> cache2 = grid(2).getOrCreateCache(getLocalConfig())) {
+ IgniteCache<String, String> cache0 = grid(0).cache(CACHE_NAME_LOC);
+ IgniteCache<String, String> cache1 = grid(1).cache(CACHE_NAME_LOC);
+
+ assert cache0.get(KEY_VAL) == null;
+ assert cache1.get(KEY_VAL) == null;
+ assert cache2.get(KEY_VAL) == null;
+
+ curVal = KEY_VAL + curVal;
+
+ cache0.put(KEY_VAL, curVal + 1);
+ cache1.put(KEY_VAL, curVal + 2);
+ cache2.put(KEY_VAL, curVal + 3);
+
+ assert cache0.get(KEY_VAL).equals(curVal + 1);
+ assert cache1.get(KEY_VAL).equals(curVal + 2);
+ assert cache2.get(KEY_VAL).equals(curVal + 3);
+ }
+ }
+ }
+
+ /**
+ * Tests concurrent close.
+ *
+ * @throws Exception If failed.
+ */
+ public void testConcurrentCloseSetWithTry() throws Exception {
+ final AtomicInteger a1 = new AtomicInteger();
+ final AtomicInteger a2 = new AtomicInteger();
+ final AtomicInteger a3 = new AtomicInteger();
+ final AtomicInteger a4 = new AtomicInteger();
+
+ Thread t1 = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("test-thread-1");
+
+ closeWithTry(a1, 0);
+ }
+ });
+ Thread t2 = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("test-thread-2");
+
+ closeWithTry(a2, 0);
+ }
+ });
+ Thread t3 = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("test-thread-3");
+
+ closeWithTry(a3, 2);
+ }
+ });
+ Thread t4 = new Thread(new Runnable() {
+ @Override public void run() {
+ Thread.currentThread().setName("test-thread-4");
+
+ closeWithTry(a4, 2);
+ }
+ });
+
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(getDhtConfig());
+
+ cache.close();
+
+ t1.start();
+ t2.start();
+ t3.start();
+ t4.start();
+
+ try {
+ U.sleep(1000);
+ }
+ finally {
+ stop = true;
+ }
+
+ t1.join();
+ t2.join();
+ t3.join();
+ t4.join();
+
+ assert a1.get() > 1;
+ assert a2.get() > 1;
+ assert a3.get() > 1;
+ assert a4.get() > 1;
+
+ checkUsageFails(cache);
+ }
+
+ /**
+ * @param a AtomicInteger.
+ * @param node Node.
+ */
+ public void closeWithTry(AtomicInteger a, int node) {
+ while (!stop) {
+ try (IgniteCache<String, String> cache = grid(node).getOrCreateCache(getDhtConfig())) {
+ a.incrementAndGet();
+
+ assert cache.get(KEY_VAL) == null || cache.get(KEY_VAL).equals(KEY_VAL);
+
+ cache.put(KEY_VAL, KEY_VAL);
+
+ assert cache.get(KEY_VAL).equals(KEY_VAL);
+ }
+ }
+ }
+
+ /**
+ * Tests start -> destroy -> start -> close using CacheManager.
+ */
+ public void testTckStyleCreateDestroyClose() {
+ CacheManager mgr = Caching.getCachingProvider().getCacheManager();
+
+ String cacheName = "cache";
+
+ mgr.createCache(cacheName, new MutableConfiguration<Integer, String>().setTypes(Integer.class, String.class));
+
+ mgr.destroyCache(cacheName);
+
+ Cache<Integer, String> cache = mgr.createCache(cacheName,
+ new MutableConfiguration<Integer, String>().setTypes(Integer.class, String.class));
+
+ cache.close();
+
+ cache.close();
+
+ try {
+ cache.get(1);
+
+ fail();
+ }
+ catch (IllegalStateException e) {
+ // No-op;
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void checkDestroyed(IgniteCache<Object, Object> cache) throws Exception {
+ checkUsageFails(cache);
+
+ awaitPartitionMapExchange();
+
+ String cacheName = cache.getName();
+
+ for (int i = 0; i < 3; i++)
+ assertNull("Unexpected cache for node: " + i, grid(i).cache(cacheName));
+ }
+
+ /**
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void checkUsageFails(IgniteCache<Object, Object> cache) throws Exception {
+ try {
+ cache.get(0);
+
+ fail();
+ }
+ catch (IllegalStateException e) {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeDynamicStartAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeDynamicStartAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeDynamicStartAbstractTest.java
index 82667d9..2d52933 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeDynamicStartAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeDynamicStartAbstractTest.java
@@ -163,7 +163,7 @@ public abstract class CacheStoreUsageMultinodeDynamicStartAbstractTest extends C
cache = srv.cache(null);
if (cache != null)
- cache.close();
+ cache.destroy();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridProjectionForCachesOnDaemonNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridProjectionForCachesOnDaemonNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridProjectionForCachesOnDaemonNodeSelfTest.java
index b9acd99..e640f82 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridProjectionForCachesOnDaemonNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridProjectionForCachesOnDaemonNodeSelfTest.java
@@ -88,7 +88,7 @@ public class GridProjectionForCachesOnDaemonNodeSelfTest extends GridCommonAbstr
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
- ignite.cache(null).close();
+ ignite.cache(null).destroy();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index cd19703..d1f8016 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -175,7 +175,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
- futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME));
+ futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
return null;
}
@@ -237,7 +237,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
@Override public Object call() throws Exception {
IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
- futs.add(kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME));
+ futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
return null;
}
@@ -300,7 +300,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount(); g++)
caches[g] = grid(g).cache(DYNAMIC_CACHE_NAME);
- kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
for (int g = 0; g < nodeCount(); g++) {
final IgniteKernal kernal0 = (IgniteKernal) grid(g);
@@ -353,7 +353,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}
// Undeploy cache.
- kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
startGrid(nodeCount() + 1);
@@ -430,7 +430,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}, IllegalArgumentException.class, null);
}
- kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
stopGrid(nodeCount() + 1);
stopGrid(nodeCount());
@@ -483,7 +483,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- IgniteKernal ignite = (IgniteKernal) grid(nodeCount());
+ IgniteKernal ignite = (IgniteKernal)grid(nodeCount());
return ignite.getCache(DYNAMIC_CACHE_NAME);
}
@@ -497,7 +497,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
}
finally {
stopGrid(nodeCount());
@@ -539,7 +539,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
}
finally {
stopGrid(nodeCount());
@@ -585,7 +585,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
}
finally {
stopGrid(nodeCount());
@@ -638,10 +638,15 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
ignite(i).events().localListen(lsnrs[i], EventType.EVTS_CACHE_LIFECYCLE);
}
- try (IgniteCache<Object, Object> ignored = ignite(0).createCache(cfg)) {
+ IgniteCache<Object, Object> cache = ignite(0).createCache(cfg);
+
+ try {
for (CountDownLatch start : starts)
start.await();
}
+ finally {
+ cache.destroy();
+ }
for (CountDownLatch stop : stops)
stop.await();
@@ -665,28 +670,29 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
ccfg.setCacheMode(CacheMode.PARTITIONED);
ccfg.setNodeFilter(NODE_FILTER);
- try (IgniteCache cache = ig.createCache(ccfg, new NearCacheConfiguration())) {
- assertNotNull(cache);
+ IgniteCache cache = ig.createCache(ccfg, new NearCacheConfiguration());
+ assertNotNull(cache);
- GridCacheAdapter<Object, Object> cacheAdapter =
- ((IgniteKernal)ig).internalCache(DYNAMIC_CACHE_NAME);
+ GridCacheAdapter<Object, Object> cacheAdapter =
+ ((IgniteKernal)ig).internalCache(DYNAMIC_CACHE_NAME);
- assertNotNull(cacheAdapter);
- assertFalse(cacheAdapter.context().affinityNode());
- assertTrue(cacheAdapter.context().isNear());
+ assertNotNull(cacheAdapter);
+ assertFalse(cacheAdapter.context().affinityNode());
+ assertTrue(cacheAdapter.context().isNear());
- try {
- IgniteEx grid = startGrid(nodeCount() + 1);
+ try {
+ IgniteEx grid = startGrid(nodeCount() + 1);
- // Check that new node sees near node.
- GridDiscoveryManager disco = grid.context().discovery();
+ // Check that new node sees near node.
+ GridDiscoveryManager disco = grid.context().discovery();
- assertTrue(disco.cacheNearNode(disco.node(ig.cluster().localNode().id()),
- DYNAMIC_CACHE_NAME));
- }
- finally {
- stopGrid(nodeCount() + 1);
- }
+ assertTrue(disco.cacheNearNode(disco.node(ig.cluster().localNode().id()),
+ DYNAMIC_CACHE_NAME));
+ }
+ finally {
+ cache.destroy();
+
+ stopGrid(nodeCount() + 1);
}
}
finally {
@@ -955,14 +961,14 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
cfg.setNodeFilter(F.not(NODE_FILTER));
- try (IgniteCache<Object, Object> ignored = ignite(0).createCache(cfg)) {
+ IgniteCache<Object, Object> cache = ignite(0).createCache(cfg);
- final CountDownLatch[] latches = new CountDownLatch[nodeCount()];
+ final CountDownLatch[] latches = new CountDownLatch[nodeCount()];
- IgnitePredicate[] lsnrs = new IgnitePredicate[nodeCount()];
+ IgnitePredicate[] lsnrs = new IgnitePredicate[nodeCount()];
- for (int i = 0; i < nodeCount(); i++) {
- final int idx = i;
+ for (int i = 0; i < nodeCount(); i++) {
+ final int idx = i;
latches[i] = new CountDownLatch(1);
lsnrs[i] = new IgnitePredicate<CacheEvent>() {
@@ -971,29 +977,30 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
case EventType.EVT_CACHE_NODES_LEFT:
latches[idx].countDown();
- break;
+ break;
- default:
- assert false;
- }
+ default:
+ assert false;
+ }
- assertEquals(DYNAMIC_CACHE_NAME, e.cacheName());
+ assertEquals(DYNAMIC_CACHE_NAME, e.cacheName());
- return true;
- }
- };
+ return true;
+ }
+ };
- ignite(i).events().localListen(lsnrs[i], EventType.EVTS_CACHE_LIFECYCLE);
- }
+ ignite(i).events().localListen(lsnrs[i], EventType.EVTS_CACHE_LIFECYCLE);
+ }
- stopGrid(nodeCount());
+ stopGrid(nodeCount());
- for (CountDownLatch latch : latches)
- latch.await();
+ for (CountDownLatch latch : latches)
+ latch.await();
- for (int i = 0; i < nodeCount(); i++)
- ignite(i).events().stopLocalListen(lsnrs[i]);
- }
+ for (int i = 0; i < nodeCount(); i++)
+ ignite(i).events().stopLocalListen(lsnrs[i]);
+
+ cache.destroy();
}
/**
@@ -1007,7 +1014,9 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
try {
CacheConfiguration cfg = new CacheConfiguration(DYNAMIC_CACHE_NAME);
- try (IgniteCache cache = ignite(0).createCache(cfg)) {
+ IgniteCache cache = ignite(0).createCache(cfg);
+
+ try {
for (int i = 0; i < 100; i++) {
assertFalse(ignite(0).affinity(DYNAMIC_CACHE_NAME).mapKeyToPrimaryAndBackups(i)
.contains(dNode.cluster().localNode()));
@@ -1015,6 +1024,9 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
cache.put(i, i);
}
}
+ finally {
+ cache.destroy();
+ }
}
finally {
stopGrid(nodeCount());
@@ -1027,23 +1039,25 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testAwaitPartitionMapExchange() throws Exception {
- try (IgniteCache ignored = grid(0).getOrCreateCache(new CacheConfiguration(DYNAMIC_CACHE_NAME))) {
- awaitPartitionMapExchange();
+ IgniteCache cache = grid(0).getOrCreateCache(new CacheConfiguration(DYNAMIC_CACHE_NAME));
- startGrid(nodeCount());
+ awaitPartitionMapExchange();
- awaitPartitionMapExchange();
+ startGrid(nodeCount());
- startGrid(nodeCount() + 1);
+ awaitPartitionMapExchange();
- awaitPartitionMapExchange();
+ startGrid(nodeCount() + 1);
- stopGrid(nodeCount() + 1);
+ awaitPartitionMapExchange();
- awaitPartitionMapExchange();
+ stopGrid(nodeCount() + 1);
- stopGrid(nodeCount());
- }
+ awaitPartitionMapExchange();
+
+ stopGrid(nodeCount());
+
+ cache.destroy();
}
/**
@@ -1084,9 +1098,11 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
if (iter % 10 == 0)
log.info("Cache start/stop iteration: " + iter);
- try (IgniteCache<Object, Object> cache = ignite1.getOrCreateCache("cache-" + iter)) {
- assertNotNull(cache);
- }
+ IgniteCache<Object, Object> cache = ignite1.getOrCreateCache("cache-" + iter);
+
+ assertNotNull(cache);
+
+ cache.destroy();
iter++;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
index d60a0c3..5a51a1b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -498,7 +498,7 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
spi1.reset();
spi2.reset();
- assertNull(((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1"));
+ assertNull(((IgniteKernal)ignite2).context().cache().context().cache().internalCache(CACHE_NAME1));
if (nearCache)
ignite2.getOrCreateNearCache(CACHE_NAME1, new NearCacheConfiguration<>());
@@ -507,7 +507,7 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 1));
- GridCacheAdapter cache = ((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1");
+ GridCacheAdapter cache = ((IgniteKernal)ignite2).context().cache().context().cache().internalCache(CACHE_NAME1);
assertNotNull(cache);
assertEquals(nearCache, cache.context().isNear());
@@ -533,6 +533,29 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
spi1.reset();
spi2.reset();
+ AffinityTopologyVersion topVer;
+
+ if (!srvNode) {
+ log.info("Close client cache: " + CACHE_NAME1);
+
+ ignite2.cache(CACHE_NAME1).close();
+
+ assertNull(((IgniteKernal)ignite2).context().cache().context().cache().internalCache(CACHE_NAME1));
+
+ waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 2));
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(0, spi0.partitionsFullMessages());
+ assertEquals(0, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+ assertEquals(0, spi2.partitionsSingleMessages());
+ assertEquals(0, spi2.partitionsFullMessages());
+
+ topVer = new AffinityTopologyVersion(3, 3);
+ }
+ else
+ topVer = new AffinityTopologyVersion(3, 2);
+
final String CACHE_NAME2 = "cache2";
ccfg = new CacheConfiguration();
@@ -541,7 +564,7 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
ignite2.createCache(ccfg);
- waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 2));
+ waitForTopologyUpdate(3, topVer);
assertEquals(0, spi0.partitionsSingleMessages());
assertEquals(2, spi0.partitionsFullMessages());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
index 3d44600..f4b0d2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
@@ -96,7 +96,7 @@ public class CacheLocalOffHeapAndSwapMetricsSelfTest extends GridCommonAbstractT
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
if (cache != null)
- cache.close();
+ cache.destroy();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
index 12b6458..470ac79 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
@@ -76,15 +76,17 @@ public class DataStreamerMultinodeCreateCacheTest extends GridCommonAbstractTest
while (System.currentTimeMillis() < stopTime) {
String cacheName = "cache-" + threadIdx + "-" + (iter % 10);
- try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cacheName)) {
- try (IgniteDataStreamer<Object, Object> stmr = ignite.dataStreamer(cacheName)) {
- ((DataStreamerImpl<Object, Object>)stmr).maxRemapCount(0);
+ IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cacheName);
- for (int i = 0; i < 1000; i++)
- stmr.addData(i, i);
- }
+ try (IgniteDataStreamer<Object, Object> stmr = ignite.dataStreamer(cacheName)) {
+ ((DataStreamerImpl<Object, Object>)stmr).maxRemapCount(0);
+
+ for (int i = 0; i < 1000; i++)
+ stmr.addData(i, i);
}
+ cache.destroy();
+
iter++;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 12d2b05..bde3a72 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -131,6 +131,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheRemoveAllSelfTest.class);
+ suite.addTestSuite(CacheStopAndDestroySelfTest.class);
+
suite.addTestSuite(CacheOffheapMapEntrySelfTest.class);
suite.addTestSuite(CacheJdbcStoreSessionListenerSelfTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3fba883/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigurationP2PTestClient.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigurationP2PTestClient.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigurationP2PTestClient.java
index eea3a9b..701668b0 100644
--- a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigurationP2PTestClient.java
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/startcache/CacheConfigurationP2PTestClient.java
@@ -109,9 +109,9 @@ public class CacheConfigurationP2PTestClient {
if (cnt != 600)
throw new Exception("Unexpected query result: " + cnt);
- cache1.close();
+ cache1.destroy();
- cache2.close();
+ cache2.destroy();
}
}
}
[6/8] incubator-ignite git commit: # ignite-901
Posted by sb...@apache.org.
# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/782c235c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/782c235c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/782c235c
Branch: refs/heads/ignite-901
Commit: 782c235cfafcb2195c6a4b505f3d8a42bf97dba1
Parents: a6222c9 6386794
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 10 12:14:53 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 10 12:14:53 2015 +0300
----------------------------------------------------------------------
.../examples/ScalarCacheAffinityExample.scala | 2 +-
.../scalar/examples/ScalarCacheExample.scala | 2 +-
.../ScalarCachePopularNumbersExample.scala | 2 +-
.../examples/ScalarCacheQueryExample.scala | 2 +-
.../examples/ScalarSnowflakeSchemaExample.scala | 4 +-
.../java/org/apache/ignite/IgniteCache.java | 14 +-
.../org/apache/ignite/cache/CacheManager.java | 13 +-
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../discovery/GridDiscoveryManager.java | 23 +-
.../cache/DynamicCacheChangeRequest.java | 39 +-
.../processors/cache/GridCacheGateway.java | 4 +-
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCacheProcessor.java | 102 ++-
.../processors/cache/IgniteCacheProxy.java | 448 +++++++---
.../distributed/dht/GridDhtCacheEntry.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 30 +-
.../datastreamer/DataStreamProcessor.java | 3 +
.../ignite/internal/util/IgniteUtils.java | 6 +-
.../visor/cache/VisorCacheStopTask.java | 2 +-
.../tcp/internal/TcpDiscoveryNode.java | 8 +-
.../affinity/IgniteClientNodeAffinityTest.java | 14 +-
.../IgniteFairAffinityDynamicCacheSelfTest.java | 3 +-
...cheStoreSessionListenerAbstractSelfTest.java | 111 ++-
.../GridCacheTxLoadFromStoreOnLockSelfTest.java | 34 +-
.../CacheMetricsForClusterGroupSelfTest.java | 10 +-
.../cache/CacheOffheapMapEntrySelfTest.java | 7 +-
.../cache/CacheStopAndDestroySelfTest.java | 859 +++++++++++++++++++
...eUsageMultinodeDynamicStartAbstractTest.java | 2 +-
...ProjectionForCachesOnDaemonNodeSelfTest.java | 2 +-
.../cache/IgniteDynamicCacheStartSelfTest.java | 140 +--
...teCacheClientNodePartitionsExchangeTest.java | 29 +-
...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 2 +-
.../DataStreamerMultinodeCreateCacheTest.java | 14 +-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
.../CacheConfigurationP2PTestClient.java | 4 +-
35 files changed, 1603 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/782c235c/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/782c235c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/782c235c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index da409a3,f2beb0a..eeb9b7c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@@ -99,14 -68,20 +99,14 @@@ public class GridCacheGateway<K, V>
*
* @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
*/
- public boolean enterIfNotClosed() {
+ public boolean enterIfNotStopped() {
onEnter();
- // Must unlock in case of unexpected errors to avoid
- // deadlocks during kernal stop.
+ // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
-
- return false;
- }
+ return checkState(true, false);
- return true;
}
/**
@@@ -114,10 -89,10 +114,10 @@@
*
* @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
*/
- public boolean enterIfNotClosedNoLock() {
+ public boolean enterIfNotStoppedNoLock() {
onEnter();
- return !stopped;
+ return checkState(false, false);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/782c235c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/782c235c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 767b62a,bb87a86..46f9206
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -1602,13 -1521,19 +1608,19 @@@ public class GridCacheProcessor extend
* @param req Stop request.
*/
public void blockGateway(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() || req.close();
- // Break the proxy before exchange future is done.
- IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
+ if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
+ // Break the proxy before exchange future is done.
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
- if (proxy != null)
- proxy.gate().stopped();
+ if (proxy != null) {
+ if (req.stop())
- proxy.gate().block();
++ proxy.gate().stopped();
+ else
+ proxy.closeProxy();
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/782c235c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/782c235c/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/782c235c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------