You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/12 13:30:49 UTC
[30/37] incubator-ignite git commit: #ignite-373: Get locks for
partitions in removeAll().
#ignite-373: Get locks for partitions in removeAll().
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1ffe1158
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1ffe1158
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1ffe1158
Branch: refs/heads/ignite-373
Commit: 1ffe1158e95145bc7f0912d8e912403757add60c
Parents: cb09f7c
Author: ivasilinets <iv...@gridgain.com>
Authored: Tue May 12 13:36:30 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Tue May 12 13:36:30 2015 +0300
----------------------------------------------------------------------
.../GridDistributedCacheAdapter.java | 73 +++++++++++++-------
1 file changed, 47 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ffe1158/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index b4417a0..29a806b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.datastreamer.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
@@ -41,6 +42,7 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
/**
* Distributed cache implementation.
@@ -142,10 +144,10 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
try {
AffinityTopologyVersion topVer;
- boolean removed;
+ boolean removedAll;
do {
- removed = true;
+ removedAll = true;
topVer = ctx.affinity().affinityTopologyVersion();
@@ -156,17 +158,17 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
if (!nodes.isEmpty()) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
- Collection<Object> results = ctx.closures().callAsyncNoFailover(BROADCAST,
+ Collection<Boolean> results = ctx.closures().callAsyncNoFailover(BROADCAST,
Collections.singleton(new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore())), nodes,
true).get();
- for (Object res : results) {
- if (res != null)
- removed = false;
+ for (Boolean res : results) {
+ if (res != null && !res)
+ removedAll = false;
}
}
}
- while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removed);
+ while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removedAll);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -241,7 +243,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
* operation on a cache with the given name.
*/
@GridInternal
- private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
+ private static class GlobalRemoveAllCallable<K,V> implements Callable<Boolean>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -279,11 +281,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/**
* {@inheritDoc}
*/
- @Override public Object call() throws Exception {
+ @Override public Boolean call() throws Exception {
GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
if (cacheAdapter == null)
- return new Integer(-1);
+ return false;
final GridCacheContext<K, V> ctx = cacheAdapter.context();
@@ -293,7 +295,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
try {
if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
- return new Integer(-1); // Ignore this remove request because remove request will be sent again.
+ return false; // Ignore this remove request because remove request will be sent again.
GridDhtCacheAdapter<K, V> dht;
GridNearCacheAdapter<K, V> near = null;
@@ -313,24 +315,43 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
- for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
- if (!locPart.isEmpty() && locPart.primary(topVer)) {
- for (GridDhtCacheEntry o : locPart.entries()) {
- if (!o.obsoleteOrDeleted())
- dataLdr.removeDataInternal(o.key());
- }
+ for (GridDhtLocalPartition locPart : dht.topology().localPartitions()) {
+ if (locPart.state() == EVICTED) {
+ assert locPart.entries().size() == 0;
+
+ continue;
}
- }
- Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer);
+ if (locPart == null || locPart.state() != OWNING || !locPart.reserve())
+ return false;
+
+ try {
+ if (!locPart.isEmpty() && locPart.primary(topVer)) {
+ for (GridDhtCacheEntry o : locPart.entries()) {
+ if (!ctx.affinity().belongs(ctx.localNode(), locPart.id(), dht.topology().topologyVersion()))
+ return false;
+
+ if (!o.obsoleteOrDeleted())
+ dataLdr.removeDataInternal(o.key());
+ }
+ }
- while (it.hasNext())
- dataLdr.removeDataInternal(it.next());
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ ctx.swap().iterator(locPart.id());
- it = dht.context().swap().swapKeyIterator(true, false, topVer);
+ if (iter != null) {
+ for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+ if (!ctx.affinity().belongs(ctx.localNode(), locPart.id(), dht.topology().topologyVersion()))
+ return false;
- while (it.hasNext())
- dataLdr.removeDataInternal(it.next());
+ dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
+ }
+ }
+ }
+ finally {
+ locPart.release();
+ }
+ }
}
if (near != null) {
@@ -347,9 +368,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
}
if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
- return new Integer(-1);
+ return false;
- return null;
+ return true;
}
/** {@inheritDoc} */