You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/15 14:50:33 UTC
[11/17] ignite git commit: IGNITE-1482 - Fixed incorrect cache value
for replace() on changing topology.
IGNITE-1482 - Fixed incorrect cache value for replace() on changing topology.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/367d805d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/367d805d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/367d805d
Branch: refs/heads/ignite-gg-10760
Commit: 367d805d10ea071532fe99c6b67cfc97cc8f2fb9
Parents: 91dd7c1
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 14:54:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 14:54:20 2015 +0300
----------------------------------------------------------------------
.../GridDistributedTxRemoteAdapter.java | 8 +--
.../distributed/dht/GridDhtTxPrepareFuture.java | 2 +-
.../IgniteCacheEntryProcessorNodeJoinTest.java | 73 ++++++++++++++++++++
3 files changed, 78 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c930d88..f969737 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -521,7 +521,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
if (updateNearCache(cacheCtx, txEntry.key(), topVer))
nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
- if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
+ if (!F.isEmpty(txEntry.entryProcessors()))
txEntry.cached().unswap(false);
IgniteBiTuple<GridCacheOperation, CacheObject> res =
@@ -573,12 +573,12 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
// Invalidate only for near nodes (backups cannot be invalidated).
if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
- topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE,
+ topVer, null, replicate ? DR_BACKUP : DR_NONE,
near() ? null : explicitVer, CU.subjectId(this, cctx),
resolveTaskName());
else {
cached.innerSet(this, eventNodeId(), nodeId, val, false, false,
- txEntry.ttl(), true, true, topVer, txEntry.filters(),
+ txEntry.ttl(), true, true, topVer, null,
replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(),
near() ? null : explicitVer, CU.subjectId(this, cctx),
resolveTaskName());
@@ -598,7 +598,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
else if (op == DELETE) {
cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
- topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE,
+ topVer, null, replicate ? DR_BACKUP : DR_NONE,
near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName());
// Keep near entry up to date.
http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 89fc0ae..81cc272 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -842,7 +842,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
IgniteTxEntry e,
Map<Integer, Collection<KeyCacheObject>> map
) {
- if (retVal || !F.isEmpty(e.entryProcessors())) {
+ if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters())) {
if (map == null)
map = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index af9477e..6b4d473 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -30,7 +30,9 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -201,6 +203,77 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplaceNodeJoin() throws Exception {
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final int started = 6;
+
+ try {
+ int keys = 100;
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ for (int i = 0; i < keys; i++)
+ ignite(0).cache(null).put(i, 0);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ for (int i = 0; i < started; i++) {
+ U.sleep(1_000);
+
+ IgniteEx grid = startGrid(GRID_CNT + i);
+
+ info("Test started grid [idx=" + (GRID_CNT + i) + ", nodeId=" + grid.localNode().id() + ']');
+ }
+ }
+ catch (Exception e) {
+ error.compareAndSet(null, e);
+ }
+ finally {
+ done.set(true);
+ }
+ }
+ }, 1, "starter");
+
+ int updVal = 0;
+
+ try {
+ while (!done.get()) {
+ info("Will put: " + (updVal + 1));
+
+ for (int i = 0; i < keys; i++)
+ assertTrue("Failed [key=" + i + ", oldVal=" + updVal+ ']',
+ ignite(0).cache(null).replace(i, updVal, updVal + 1));
+
+ updVal++;
+ }
+ }
+ finally {
+ fut.get(getTestTimeout());
+ }
+
+ for (int i = 0; i < keys; i++) {
+ for (int g = 0; g < GRID_CNT + started; g++) {
+ Integer val = ignite(g).<Integer, Integer>cache(null).get(i);
+
+ GridCacheEntryEx entry = ((IgniteKernal)grid(g)).internalCache(null).peekEx(i);
+
+ if (updVal != val)
+ info("Invalid value for grid [g=" + g + ", entry=" + entry + ']');
+
+ assertEquals((Integer)updVal, val);
+ }
+ }
+ }
+ finally {
+ for (int i = 0; i < started; i++)
+ stopGrid(GRID_CNT + i);
+ }
+ }
+
/** */
private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
/** */