You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 19:26:09 UTC

[37/50] [abbrv] ignite git commit: IGNITE-1482 - Fixed incorrect cache value for replace() on changing topology.

IGNITE-1482 - Fixed incorrect cache value for replace() on changing topology.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/367d805d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/367d805d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/367d805d

Branch: refs/heads/ignite-1093-2
Commit: 367d805d10ea071532fe99c6b67cfc97cc8f2fb9
Parents: 91dd7c1
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 14:54:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 14:54:20 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedTxRemoteAdapter.java         |  8 +--
 .../distributed/dht/GridDhtTxPrepareFuture.java |  2 +-
 .../IgniteCacheEntryProcessorNodeJoinTest.java  | 73 ++++++++++++++++++++
 3 files changed, 78 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c930d88..f969737 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -521,7 +521,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                     if (updateNearCache(cacheCtx, txEntry.key(), topVer))
                                         nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
 
-                                    if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
+                                    if (!F.isEmpty(txEntry.entryProcessors()))
                                         txEntry.cached().unswap(false);
 
                                     IgniteBiTuple<GridCacheOperation, CacheObject> res =
@@ -573,12 +573,12 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         // Invalidate only for near nodes (backups cannot be invalidated).
                                         if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
                                             cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
-                                                topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE,
+                                                topVer, null, replicate ? DR_BACKUP : DR_NONE,
                                                 near() ? null : explicitVer, CU.subjectId(this, cctx),
                                                 resolveTaskName());
                                         else {
                                             cached.innerSet(this, eventNodeId(), nodeId, val, false, false,
-                                                txEntry.ttl(), true, true, topVer, txEntry.filters(),
+                                                txEntry.ttl(), true, true, topVer, null,
                                                 replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(),
                                                 near() ? null : explicitVer, CU.subjectId(this, cctx),
                                                 resolveTaskName());
@@ -598,7 +598,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                     }
                                     else if (op == DELETE) {
                                         cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
-                                            topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE,
+                                            topVer, null, replicate ? DR_BACKUP : DR_NONE,
                                             near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName());
 
                                         // Keep near entry up to date.

http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 89fc0ae..81cc272 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -842,7 +842,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         IgniteTxEntry e,
         Map<Integer, Collection<KeyCacheObject>> map
     ) {
-        if (retVal || !F.isEmpty(e.entryProcessors())) {
+        if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters())) {
             if (map == null)
                 map = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index af9477e..6b4d473 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -30,7 +30,9 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -201,6 +203,77 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplaceNodeJoin() throws Exception {
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final int started = 6;
+
+        try {
+            int keys = 100;
+
+            final AtomicBoolean done = new AtomicBoolean(false);
+
+            for (int i = 0; i < keys; i++)
+                ignite(0).cache(null).put(i, 0);
+
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        for (int i = 0; i < started; i++) {
+                            U.sleep(1_000);
+
+                            IgniteEx grid = startGrid(GRID_CNT + i);
+
+                            info("Test started grid [idx=" + (GRID_CNT + i) + ", nodeId=" + grid.localNode().id() + ']');
+                        }
+                    }
+                    catch (Exception e) {
+                        error.compareAndSet(null, e);
+                    }
+                    finally {
+                        done.set(true);
+                    }
+                }
+            }, 1, "starter");
+
+            int updVal = 0;
+
+            try {
+                while (!done.get()) {
+                    info("Will put: " + (updVal + 1));
+
+                    for (int i = 0; i < keys; i++)
+                        assertTrue("Failed [key=" + i + ", oldVal=" + updVal+ ']',
+                            ignite(0).cache(null).replace(i, updVal, updVal + 1));
+
+                    updVal++;
+                }
+            }
+            finally {
+                fut.get(getTestTimeout());
+            }
+
+            for (int i = 0; i < keys; i++) {
+                for (int g = 0; g < GRID_CNT + started; g++) {
+                    Integer val = ignite(g).<Integer, Integer>cache(null).get(i);
+
+                    GridCacheEntryEx entry = ((IgniteKernal)grid(g)).internalCache(null).peekEx(i);
+
+                    if (updVal != val)
+                        info("Invalid value for grid [g=" + g + ", entry=" + entry + ']');
+
+                    assertEquals((Integer)updVal, val);
+                }
+            }
+        }
+        finally {
+            for (int i = 0; i < started; i++)
+                stopGrid(GRID_CNT + i);
+        }
+    }
+
     /** */
     private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
         /** */