You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/12 13:30:49 UTC

[30/37] incubator-ignite git commit: #ignite-373: Get locks for partitions in removeAll().

#ignite-373: Get locks for partitions in removeAll().


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1ffe1158
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1ffe1158
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1ffe1158

Branch: refs/heads/ignite-373
Commit: 1ffe1158e95145bc7f0912d8e912403757add60c
Parents: cb09f7c
Author: ivasilinets <iv...@gridgain.com>
Authored: Tue May 12 13:36:30 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Tue May 12 13:36:30 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedCacheAdapter.java            | 73 +++++++++++++-------
 1 file changed, 47 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ffe1158/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index b4417a0..29a806b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
@@ -41,6 +42,7 @@ import java.util.*;
 import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 
 /**
  * Distributed cache implementation.
@@ -142,10 +144,10 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         try {
             AffinityTopologyVersion topVer;
 
-            boolean removed;
+            boolean removedAll;
 
             do {
-                removed = true;
+                removedAll = true;
 
                 topVer = ctx.affinity().affinityTopologyVersion();
 
@@ -156,17 +158,17 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 if (!nodes.isEmpty()) {
                     CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-                    Collection<Object> results = ctx.closures().callAsyncNoFailover(BROADCAST,
+                    Collection<Boolean> results = ctx.closures().callAsyncNoFailover(BROADCAST,
                         Collections.singleton(new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore())), nodes,
                         true).get();
 
-                    for (Object res : results) {
-                        if (res != null)
-                            removed = false;
+                    for (Boolean res : results) {
+                        if (res != null && !res)
+                            removedAll = false;
                     }
                 }
             }
-            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removed);
+            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removedAll);
         }
         catch (ClusterGroupEmptyCheckedException ignore) {
             if (log.isDebugEnabled())
@@ -241,7 +243,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
      * operation on a cache with the given name.
      */
     @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
+    private static class GlobalRemoveAllCallable<K,V> implements Callable<Boolean>, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -279,11 +281,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         /**
          * {@inheritDoc}
          */
-        @Override public Object call() throws Exception {
+        @Override public Boolean call() throws Exception {
             GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
 
             if (cacheAdapter == null)
-                return new Integer(-1);
+                return false;
 
             final GridCacheContext<K, V> ctx = cacheAdapter.context();
 
@@ -293,7 +295,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
             try {
                 if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
-                    return new Integer(-1); // Ignore this remove request because remove request will be sent again.
+                    return false; // Ignore this remove request because remove request will be sent again.
 
                 GridDhtCacheAdapter<K, V> dht;
                 GridNearCacheAdapter<K, V> near = null;
@@ -313,24 +315,43 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
                     dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
 
-                    for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
-                        if (!locPart.isEmpty() && locPart.primary(topVer)) {
-                            for (GridDhtCacheEntry o : locPart.entries()) {
-                                if (!o.obsoleteOrDeleted())
-                                    dataLdr.removeDataInternal(o.key());
-                            }
+                    for (GridDhtLocalPartition locPart : dht.topology().localPartitions()) {
+                        if (locPart.state() == EVICTED) {
+                            assert locPart.entries().size() == 0;
+
+                            continue;
                         }
-                    }
 
-                    Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer);
+                        if (locPart == null || locPart.state() != OWNING || !locPart.reserve())
+                            return false;
+
+                        try {
+                            if (!locPart.isEmpty() && locPart.primary(topVer)) {
+                                for (GridDhtCacheEntry o : locPart.entries()) {
+                                    if (!ctx.affinity().belongs(ctx.localNode(), locPart.id(), dht.topology().topologyVersion()))
+                                        return false;
+
+                                    if (!o.obsoleteOrDeleted())
+                                        dataLdr.removeDataInternal(o.key());
+                                }
+                            }
 
-                    while (it.hasNext())
-                        dataLdr.removeDataInternal(it.next());
+                            GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                                ctx.swap().iterator(locPart.id());
 
-                    it = dht.context().swap().swapKeyIterator(true, false, topVer);
+                            if (iter != null) {
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+                                    if (!ctx.affinity().belongs(ctx.localNode(), locPart.id(), dht.topology().topologyVersion()))
+                                        return false;
 
-                    while (it.hasNext())
-                        dataLdr.removeDataInternal(it.next());
+                                    dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
+                                }
+                            }
+                        }
+                        finally {
+                            locPart.release();
+                        }
+                    }
                 }
 
                 if (near != null) {
@@ -347,9 +368,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
             }
 
             if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
-                return new Integer(-1);
+                return false;
 
-            return null;
+            return true;
         }
 
         /** {@inheritDoc} */