You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/15 23:21:05 UTC
[32/50] [abbrv] incubator-ignite git commit: #ignite-373: Cache is
not empty after removeAll.
#ignite-373: Cache is not empty after removeAll.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/593e3eee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/593e3eee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/593e3eee
Branch: refs/heads/ignite-471
Commit: 593e3eeeb0d4965b1c1a83d4f68a9d18e6615632
Parents: 7c91389
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:35:27 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:35:27 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 119 +++++------
.../GridDistributedCacheAdapter.java | 210 ++++++++++++-------
.../cache/CacheRemoveAllSelfTest.java | 81 +++++++
.../near/NoneRebalanceModeSelfTest.java | 67 ++++++
.../testsuites/IgniteCacheTestSuite2.java | 1 +
.../testsuites/IgniteCacheTestSuite4.java | 2 +
6 files changed, 338 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3826bfa..4106cb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1133,7 +1133,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get();
+ ctx.kernalContext().task().execute(
+ new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null).get();
}
}
@@ -1152,7 +1153,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!nodes.isEmpty()) {
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null);
+ return ctx.kernalContext().task().execute(
+ new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null);
}
else
return new GridFinishedFuture<>();
@@ -3571,7 +3573,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null);
+ return ctx.kernalContext().task().execute(
+ new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null);
}
/** {@inheritDoc} */
@@ -4827,13 +4830,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private static final long serialVersionUID = 0L;
/**
- * Empty constructor for serialization.
- */
- public GlobalClearAllJob() {
- // No-op.
- }
-
- /**
* @param cacheName Cache name.
* @param topVer Affinity topology version.
*/
@@ -4859,14 +4855,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private static final long serialVersionUID = 0L;
/** Keys to remove. */
- private Set<? extends K> keys;
-
- /**
- * Empty constructor for serialization.
- */
- public GlobalClearKeySetJob() {
- // No-op.
- }
+ private final Set<? extends K> keys;
/**
* @param cacheName Cache name.
@@ -4897,14 +4886,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private static final long serialVersionUID = 0L;
/** Peek modes. */
- private CachePeekMode[] peekModes;
-
- /**
- * Required by {@link Externalizable}.
- */
- public SizeJob() {
- // No-op.
- }
+ private final CachePeekMode[] peekModes;
/**
* @param cacheName Cache name.
@@ -5514,17 +5496,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected Ignite ignite;
/** Affinity topology version. */
- protected AffinityTopologyVersion topVer;
+ protected final AffinityTopologyVersion topVer;
/** Cache name. */
- protected String cacheName;
-
- /**
- * Empty constructor for serialization.
- */
- public TopologyVersionAwareJob() {
- // No-op.
- }
+ protected final String cacheName;
/**
* @param cacheName Cache name.
@@ -5583,24 +5558,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private static final long serialVersionUID = 0L;
- /** Cache context. */
- private GridCacheContext ctx;
+ /** Cache name. */
+ private final String cacheName;
- /** Peek modes. */
- private CachePeekMode[] peekModes;
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
- /**
- * Empty constructor for serialization.
- */
- public SizeTask() {
- // No-op.
- }
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
/**
- * @param ctx Cache context.
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
+ * @param peekModes Cache peek modes.
*/
- public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) {
- this.ctx = ctx;
+ public SizeTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
this.peekModes = peekModes;
}
@@ -5610,13 +5584,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Map<ComputeJob, ClusterNode> jobs = new HashMap();
for (ClusterNode node : subgrid)
- jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node);
+ jobs.put(new SizeJob(cacheName, topVer, peekModes), node);
return jobs;
}
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ IgniteException e = res.getException();
+
+ if (e != null) {
+ if (e instanceof ClusterTopologyException)
+ return ComputeJobResultPolicy.WAIT;
+
+ throw new IgniteException("Remote job threw exception.", e);
+ }
+
return ComputeJobResultPolicy.WAIT;
}
@@ -5640,25 +5623,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private static final long serialVersionUID = 0L;
- /** Cache context. */
- private GridCacheContext ctx;
+ /** Cache name. */
+ private final String cacheName;
- /** Keys to clear. */
- private Set<? extends K> keys;
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
- /**
- * Empty constructor for serialization.
- */
- public ClearTask() {
- // No-op.
- }
+ /** Keys to clear. */
+ private final Set<? extends K> keys;
/**
- * @param ctx Cache context.
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
* @param keys Keys to clear.
*/
- public ClearTask(GridCacheContext ctx, Set<? extends K> keys) {
- this.ctx = ctx;
+ public ClearTask(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
this.keys = keys;
}
@@ -5668,9 +5649,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Map<ComputeJob, ClusterNode> jobs = new HashMap();
for (ClusterNode node : subgrid) {
- jobs.put(keys == null ?
- new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) :
- new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys),
+ jobs.put(keys == null ? new GlobalClearAllJob(cacheName, topVer) :
+ new GlobalClearKeySetJob<K>(cacheName, topVer, keys),
node);
}
@@ -5679,6 +5659,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ IgniteException e = res.getException();
+
+ if (e != null) {
+ if (e instanceof ClusterTopologyException)
+ return ComputeJobResultPolicy.WAIT;
+
+ throw new IgniteException("Remote job threw exception.", e);
+ }
+
return ComputeJobResultPolicy.WAIT;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 3a685cc..c5ef22f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.affinity.*;
@@ -30,17 +31,17 @@ import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.datastreamer.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
-import java.util.concurrent.*;
-import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
/**
* Distributed cache implementation.
@@ -142,21 +143,28 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
try {
AffinityTopologyVersion topVer;
+ boolean retry;
+
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ boolean skipStore = opCtx != null && opCtx.skipStore();
+
do {
+ retry = false;
+
topVer = ctx.affinity().affinityTopologyVersion();
// Send job to all data nodes.
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- ctx.closures().callAsyncNoFailover(BROADCAST,
- new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes,
- true).get();
+ retry = !ctx.kernalContext().task().execute(
+ new RemoveAllTask(ctx.name(), topVer, skipStore), null).get();
}
}
- while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0);
+ while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -170,7 +178,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
- removeAllAsync(opFut, topVer);
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ boolean skipStore = opCtx != null && opCtx.skipStore();
+
+ removeAllAsync(opFut, topVer, skipStore);
return opFut;
}
@@ -178,27 +190,29 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/**
* @param opFut Future.
* @param topVer Topology version.
+ * @param skipStore Skip store flag.
*/
- private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) {
+ private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer,
+ final boolean skipStore) {
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST,
- new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, true);
+ IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(
+ new RemoveAllTask(ctx.name(), topVer, skipStore), null);
- rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
+ rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
try {
- fut.get();
+ boolean retry = !fut.get();
AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion();
- if (topVer0.equals(topVer))
+ if (topVer0.equals(topVer) && !retry)
opFut.onDone();
else
- removeAllAsync(opFut, topVer0);
+ removeAllAsync(opFut, topVer0, skipStore);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -227,97 +241,150 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
}
/**
- * Internal callable which performs remove all primary key mappings
- * operation on a cache with the given name.
+ * Remove task.
*/
@GridInternal
- private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
+ private static class RemoveAllTask extends ComputeTaskAdapter<Object, Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** Cache name. */
- private String cacheName;
+ private final String cacheName;
- /** Topology version. */
- private AffinityTopologyVersion topVer;
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
/** Skip store flag. */
- private boolean skipStore;
-
- /** Injected grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
+ private final boolean skipStore;
/**
- * Empty constructor for serialization.
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
+ * @param skipStore Skip store flag.
*/
- public GlobalRemoveAllCallable() {
- // No-op.
+ public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ this.skipStore = skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) throws IgniteException {
+ Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ IgniteException e = res.getException();
+
+ if (e != null) {
+ if (e instanceof ClusterTopologyException)
+ return ComputeJobResultPolicy.WAIT;
+
+ throw new IgniteException("Remote job threw exception.", e);
+ }
+
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+ for (ComputeJobResult locRes : results) {
+ if (locRes != null && (locRes.getException() != null || !locRes.<Boolean>getData()))
+ return false;
+ }
+
+ return true;
}
+ }
+ /**
+ * Internal job which performs remove all primary key mappings
+ * operation on a cache with the given name.
+ */
+ @GridInternal
+ private static class GlobalRemoveAllJob<K,V> extends TopologyVersionAwareJob {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Skip store flag. */
+ private final boolean skipStore;
/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @param skipStore Skip store flag.
*/
- private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
- this.cacheName = cacheName;
- this.topVer = topVer;
+ private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
+ super(cacheName, topVer);
+
this.skipStore = skipStore;
}
- /**
- * {@inheritDoc}
- */
- @Override public Object call() throws Exception {
- GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+ /** {@inheritDoc} */
+ @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache0) {
+ GridCacheAdapter cache = ((IgniteKernal) ignite).context().cache().internalCache(cacheName);
- final GridCacheContext<K, V> ctx = cacheAdapter.context();
+ if (cache == null)
+ return true;
- ctx.affinity().affinityReadyFuture(topVer).get();
+ final GridCacheContext<K, V> ctx = cache.context();
ctx.gate().enter();
try {
if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
- return null; // Ignore this remove request because remove request will be sent again.
+ return false; // Ignore this remove request because remove request will be sent again.
GridDhtCacheAdapter<K, V> dht;
GridNearCacheAdapter<K, V> near = null;
- if (cacheAdapter instanceof GridNearCacheAdapter) {
- near = ((GridNearCacheAdapter<K, V>)cacheAdapter);
+ if (cache instanceof GridNearCacheAdapter) {
+ near = ((GridNearCacheAdapter<K, V>) cache);
dht = near.dht();
}
else
- dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
+ dht = (GridDhtCacheAdapter<K, V>) cache;
try (DataStreamerImpl<KeyCacheObject, Object> dataLdr =
- (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
- ((DataStreamerImpl)dataLdr).maxRemapCount(0);
+ (DataStreamerImpl) ignite.dataStreamer(cacheName)) {
+ ((DataStreamerImpl) dataLdr).maxRemapCount(0);
dataLdr.skipStore(skipStore);
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
- for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
- if (!locPart.isEmpty() && locPart.primary(topVer)) {
- for (GridDhtCacheEntry o : locPart.entries()) {
- if (!o.obsoleteOrDeleted())
- dataLdr.removeDataInternal(o.key());
- }
- }
- }
+ for (int part : ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) {
+ GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false);
- Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer);
+ if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve())
+ return false;
- while (it.hasNext())
- dataLdr.removeDataInternal(it.next());
+ try {
+ if (!locPart.isEmpty()) {
+ for (GridDhtCacheEntry o : locPart.entries()) {
+ if (!o.obsoleteOrDeleted())
+ dataLdr.removeDataInternal(o.key());
+ }
+ }
- it = dht.context().swap().swapKeyIterator(true, false, topVer);
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ dht.context().swap().iterator(part);
- while (it.hasNext())
- dataLdr.removeDataInternal(it.next());
+ if (iter != null) {
+ for (Map.Entry<byte[], GridCacheSwapEntry> e : iter)
+ dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
+ }
+ }
+ finally {
+ locPart.release();
+ }
+ }
}
if (near != null) {
@@ -329,25 +396,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
}
}
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
ctx.gate().leave();
}
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
- out.writeObject(topVer);
- out.writeBoolean(skipStore);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
- topVer = (AffinityTopologyVersion)in.readObject();
- skipStore = in.readBoolean();
+ return true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
new file mode 100644
index 0000000..f5de96f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test remove all method.
+ */
+public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAll() throws Exception {
+ IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+ for (int i = 0; i < 10_000; ++i)
+ cache.put(i, "val");
+
+ final AtomicInteger igniteId = new AtomicInteger(gridCount());
+
+ IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 2; ++i)
+ startGrid(igniteId.getAndIncrement());
+
+ return true;
+ }
+ }, 3, "start-node-thread");
+
+ cache.removeAll();
+
+ fut.get();
+
+ U.sleep(5000);
+
+ for (int i = 0; i < igniteId.get(); ++i) {
+ IgniteCache locCache = grid(i).cache(null);
+
+ assertEquals("Local size: " + locCache.localSize() + "\n" +
+ "On heap: " + locCache.localSize(CachePeekMode.ONHEAP) + "\n" +
+ "Off heap: " + locCache.localSize(CachePeekMode.OFFHEAP) + "\n" +
+ "Swap: " + locCache.localSize(CachePeekMode.SWAP) + "\n" +
+ "Primary: " + locCache.localSize(CachePeekMode.PRIMARY) + "\n" +
+ "Backup: " + locCache.localSize(CachePeekMode.BACKUP),
+ 0, locCache.localSize());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
new file mode 100644
index 0000000..d61ddcc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Test none rebalance mode.
+ */
+public class NoneRebalanceModeSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @SuppressWarnings({"ConstantConditions"})
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ CacheConfiguration cc = defaultCacheConfiguration();
+
+ cc.setRebalanceMode(NONE);
+
+ c.setCacheConfiguration(cc);
+
+ return c;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAll() throws Exception {
+ GridNearTransactionalCache cache = (GridNearTransactionalCache)((IgniteKernal)grid(0)).internalCache(null);
+
+ for (GridDhtLocalPartition part : cache.dht().topology().localPartitions())
+ assertEquals(MOVING, part.state());
+
+ grid(0).cache(null).removeAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index dc3a2c0..5738778 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -58,6 +58,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTestSuite(GridCachePartitionedGetSelfTest.class);
suite.addTest(new TestSuite(GridCachePartitionedBasicApiTest.class));
suite.addTest(new TestSuite(GridCacheNearMultiGetSelfTest.class));
+ suite.addTest(new TestSuite(NoneRebalanceModeSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearJobExecutionSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearOneNodeSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearMultiNodeSelfTest.class));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 8eb0688..aaf7e5b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -126,6 +126,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheNoValueClassOnServerNodeTest.class);
+ suite.addTestSuite(CacheRemoveAllSelfTest.class);
+
suite.addTestSuite(CacheOffheapMapEntrySelfTest.class);
return suite;