You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/12 13:30:43 UTC
[24/37] incubator-ignite git commit: #ignite-373: wip.
#ignite-373: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/778a0a72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/778a0a72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/778a0a72
Branch: refs/heads/ignite-373
Commit: 778a0a722fd066f77c58993c872fc5e409316ec1
Parents: 478b3ee
Author: ivasilinets <iv...@gridgain.com>
Authored: Tue May 12 11:19:31 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Tue May 12 11:19:31 2015 +0300
----------------------------------------------------------------------
.../GridDistributedCacheAdapter.java | 38 ++++++++++++++++----
.../cache/CacheRemoveAllSelfTest.java | 18 +++++++++-
2 files changed, 49 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/778a0a72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 3a685cc..aa42067 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
@@ -142,21 +143,32 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
try {
AffinityTopologyVersion topVer;
+ boolean removed;
+
do {
+ removed = true;
+
+ System.out.println("!!!!Redone remove all");
topVer = ctx.affinity().affinityTopologyVersion();
// Send job to all data nodes.
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
+
if (!nodes.isEmpty()) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
- ctx.closures().callAsyncNoFailover(BROADCAST,
- new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes,
+ Collection<Object> results = ctx.closures().callAsyncNoFailover(BROADCAST,
+ Collections.singleton(new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore())), nodes,
true).get();
+
+ for (Object res : results) {
+ if (res != null)
+ removed = false;
+ }
}
}
- while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0);
+ while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removed);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -272,6 +284,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
@Override public Object call() throws Exception {
GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+ if (cacheAdapter == null)
+ return new Integer(-1);
+
final GridCacheContext<K, V> ctx = cacheAdapter.context();
ctx.affinity().affinityReadyFuture(topVer).get();
@@ -279,8 +294,10 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
ctx.gate().enter();
try {
- if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
- return null; // Ignore this remove request because remove request will be sent again.
+ if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) {
+ System.out.println("!!!! have different version");
+ return new Integer(-1); // Ignore this remove request because remove request will be sent again.
+ }
GridDhtCacheAdapter<K, V> dht;
GridNearCacheAdapter<K, V> near = null;
@@ -303,7 +320,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
if (!locPart.isEmpty() && locPart.primary(topVer)) {
for (GridDhtCacheEntry o : locPart.entries()) {
- if (!o.obsoleteOrDeleted())
+ //if (!o.obsoleteOrDeleted())
dataLdr.removeDataInternal(o.key());
}
}
@@ -333,6 +350,15 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
ctx.gate().leave();
}
+ if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) {
+ System.out.println("!!!! have different version in the end. Local size=" +
+ cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.ALL}) +
+ ", local primary size=" + cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.PRIMARY}) +
+ ", local backup size=" + cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.BACKUP}));
+
+ return new Integer(-1);
+ }
+
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/778a0a72/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
index 9162ac4..c4455b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.testframework.*;
import java.util.concurrent.*;
@@ -28,6 +30,11 @@ import java.util.concurrent.atomic.*;
* Test remove all method.
*/
public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
+ @Override
+ protected long getTestTimeout() {
+ return 600000;
+ }
+
/** {@inheritDoc} */
@Override protected int gridCount() {
return 4;
@@ -58,7 +65,16 @@ public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
fut.get();
for (int i = 0; i < igniteId.get(); ++i)
- assertEquals(0, grid(i).cache(null).localSize());
+ assertEquals("Local entries: " + entrySet(grid(i).cache(null).localEntries(CachePeekMode.PRIMARY)) +
+ ". All entries:" +
+ entrySet(grid(i).cache(null).localEntries()), 0, grid(i).cache(null).localSize());
+
+ U.sleep(5000);
+
+ for (int i = 0; i < igniteId.get(); ++i)
+ assertEquals("2 Local entries: " + entrySet(grid(i).cache(null).localEntries(CachePeekMode.PRIMARY)) +
+ ". All entries:" +
+ entrySet(grid(i).cache(null).localEntries()), 0, grid(i).cache(null).localSize());
assertEquals(0, cache.size());
}