You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/12 13:30:43 UTC

[24/37] incubator-ignite git commit: #ignite-373: wip.

#ignite-373: wip.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/778a0a72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/778a0a72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/778a0a72

Branch: refs/heads/ignite-373
Commit: 778a0a722fd066f77c58993c872fc5e409316ec1
Parents: 478b3ee
Author: ivasilinets <iv...@gridgain.com>
Authored: Tue May 12 11:19:31 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Tue May 12 11:19:31 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedCacheAdapter.java            | 38 ++++++++++++++++----
 .../cache/CacheRemoveAllSelfTest.java           | 18 +++++++++-
 2 files changed, 49 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/778a0a72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 3a685cc..aa42067 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -142,21 +143,32 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         try {
             AffinityTopologyVersion topVer;
 
+            boolean removed;
+
             do {
+                removed = true;
+
+                System.out.println("!!!!Redone remove all");
                 topVer = ctx.affinity().affinityTopologyVersion();
 
                 // Send job to all data nodes.
                 Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
+
                 if (!nodes.isEmpty()) {
                     CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-                    ctx.closures().callAsyncNoFailover(BROADCAST,
-                        new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes,
+                    Collection<Object> results = ctx.closures().callAsyncNoFailover(BROADCAST,
+                        Collections.singleton(new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore())), nodes,
                         true).get();
+
+                    for (Object res : results) {
+                        if (res != null)
+                            removed = false;
+                    }
                 }
             }
-            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0);
+            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removed);
         }
         catch (ClusterGroupEmptyCheckedException ignore) {
             if (log.isDebugEnabled())
@@ -272,6 +284,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         @Override public Object call() throws Exception {
             GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
 
+            if (cacheAdapter == null)
+                return new Integer(-1);
+
             final GridCacheContext<K, V> ctx = cacheAdapter.context();
 
             ctx.affinity().affinityReadyFuture(topVer).get();
@@ -279,8 +294,10 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
             ctx.gate().enter();
 
             try {
-                if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
-                    return null; // Ignore this remove request because remove request will be sent again.
+                if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) {
+                    System.out.println("!!!! have different version");
+                    return new Integer(-1); // Ignore this remove request because remove request will be sent again.
+                }
 
                 GridDhtCacheAdapter<K, V> dht;
                 GridNearCacheAdapter<K, V> near = null;
@@ -303,7 +320,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                     for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
                         if (!locPart.isEmpty() && locPart.primary(topVer)) {
                             for (GridDhtCacheEntry o : locPart.entries()) {
-                                if (!o.obsoleteOrDeleted())
+                                //if (!o.obsoleteOrDeleted())
                                     dataLdr.removeDataInternal(o.key());
                             }
                         }
@@ -333,6 +350,15 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 ctx.gate().leave();
             }
 
+            if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) {
+                System.out.println("!!!! have different version in the end. Local size=" +
+                    cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.ALL}) +
+                    ", local primary size=" + cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.PRIMARY}) +
+                    ", local backup size=" + cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.BACKUP}));
+
+                return new Integer(-1);
+            }
+
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/778a0a72/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
index 9162ac4..c4455b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.testframework.*;
 
 import java.util.concurrent.*;
@@ -28,6 +30,11 @@ import java.util.concurrent.atomic.*;
  * Test remove all method.
  */
 public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
+    @Override
+    protected long getTestTimeout() {
+        return 600000;
+    }
+
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 4;
@@ -58,7 +65,16 @@ public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
         fut.get();
 
         for (int i = 0; i < igniteId.get(); ++i)
-            assertEquals(0, grid(i).cache(null).localSize());
+            assertEquals("Local entries: " + entrySet(grid(i).cache(null).localEntries(CachePeekMode.PRIMARY)) +
+                ". All entries:" +
+                entrySet(grid(i).cache(null).localEntries()), 0, grid(i).cache(null).localSize());
+
+        U.sleep(5000);
+
+        for (int i = 0; i < igniteId.get(); ++i)
+            assertEquals("2 Local entries: " + entrySet(grid(i).cache(null).localEntries(CachePeekMode.PRIMARY)) +
+                ". All entries:" +
+                entrySet(grid(i).cache(null).localEntries()), 0, grid(i).cache(null).localSize());
 
         assertEquals(0, cache.size());
     }