You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/30 05:54:39 UTC
ignite git commit: ignite-627
Repository: ignite
Updated Branches:
refs/heads/ignite-627 [created] 7aa249870
ignite-627
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7aa24987
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7aa24987
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7aa24987
Branch: refs/heads/ignite-627
Commit: 7aa249870ce7963cda4ad25fb2566b46a4fcf596
Parents: 797e7af
Author: sboikov <sb...@apache.org>
Authored: Fri Oct 26 13:21:07 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Tue Oct 30 08:54:21 2018 +0300
----------------------------------------------------------------------
.../GridNearAtomicAbstractUpdateFuture.java | 293 ++++++++++++++++++-
.../GridNearAtomicSingleUpdateFuture.java | 34 ++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 69 +++--
.../distributed/near/GridNearAtomicCache.java | 198 +------------
.../distributed/near/GridNearCacheEntry.java | 4 +-
...idCacheValueConsistencyAbstractSelfTest.java | 6 -
.../atomic/IgniteCacheAtomicProtocolTest.java | 44 +++
7 files changed, 410 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 983b18a..83d0bb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -36,16 +36,25 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,7 +64,10 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
/**
* Base for near atomic update futures.
@@ -142,6 +154,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
/** Operation result. */
protected GridCacheReturn opRes;
+ /** */
+ protected Map<KeyCacheObject, GridNearCacheEntry> reservedEntries;
+
/**
* Constructor.
*
@@ -242,29 +257,40 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
* Performs future mapping.
*/
public final void map() {
+ map(false);
+ }
+
+ /**
+ * Performs future mapping.
+ *
+ * @param remap Remap flag.
+ */
+ protected final void map(boolean remap) {
AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
if (topVer == null)
- mapOnTopology();
+ mapOnTopology(remap);
else {
topLocked = true;
// Cannot remap.
remapCnt = 1;
- map(topVer);
+ map(topVer, remap);
}
}
/**
* @param topVer Topology version.
+ * @param remap Remap flag.
*/
- protected abstract void map(AffinityTopologyVersion topVer);
+ protected abstract void map(AffinityTopologyVersion topVer, boolean remap);
/**
* Maps future on ready topology.
+ * @param remap Remap flag.
*/
- protected abstract void mapOnTopology();
+ protected abstract void mapOnTopology(boolean remap);
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
@@ -357,7 +383,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
if (futId != null)
cctx.mvcc().removeAtomicFuture(futId);
- super.onDone(retval, err);
+ if (super.onDone(retval, err) && nearEnabled)
+ releaseNearCacheEntries();
}
/** {@inheritDoc} */
@@ -380,6 +407,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
if (futId != null)
cctx.mvcc().removeAtomicFuture(futId);
+ if (nearEnabled)
+ releaseNearCacheEntries();
+
return true;
}
@@ -387,6 +417,25 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
}
/**
+ *
+ */
+ private void releaseNearCacheEntries() {
+ Map<KeyCacheObject, GridNearCacheEntry> reservedEntries0;
+
+ synchronized (this) {
+ if (reservedEntries == null|| reservedEntries.isEmpty())
+ return;
+
+ reservedEntries0 = reservedEntries;
+
+ reservedEntries = null;
+ }
+
+ for (GridNearCacheEntry entry : reservedEntries0.values())
+ entry.releaseEviction();
+ }
+
+ /**
* @param req Request.
* @param res Response.
*/
@@ -471,6 +520,240 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
}
/**
+ * @return Near cache.
+ */
+ protected final GridNearAtomicCache nearCache() {
+ return (GridNearAtomicCache)cctx.dht().near();
+ }
+
+ /**
+ * @param key Key,
+ * @param topVer Update topology version.
+ */
+ protected final void reserveNearCacheEntry(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ assert nearEnabled;
+ assert reservedEntries != null;
+
+ if (cctx.affinityNode() && cctx.affinity().partitionBelongs(cctx.localNode(), cctx.affinity().partition(key), topVer))
+ return;
+
+ GridNearAtomicCache nearCache = nearCache();
+
+ synchronized (this) {
+ if (reservedEntries.containsKey(key))
+ return;
+
+ while (true) {
+ try {
+ GridNearCacheEntry entry = nearCache.entryExx(key, topVer);
+
+ entry.reserveEviction();
+
+ reservedEntries.put(key, entry);
+
+ return;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry while reserving near cache entry (will retry): " + key);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param req Update request.
+ * @param res Update response.
+ */
+ protected final void processNearAtomicUpdateResponse(
+ GridNearAtomicAbstractUpdateRequest req,
+ GridNearAtomicUpdateResponse res
+ ) {
+ if (F.size(res.failedKeys()) == req.size())
+ return;
+
+ GridNearAtomicCache nearCache = nearCache();
+
+ /*
+ * Choose value to be stored in near cache: first check key is not in failed and not in skipped list,
+ * then check if value was generated on primary node, if not then use value sent in request.
+ */
+
+ Collection<KeyCacheObject> failed = res.failedKeys();
+ List<Integer> nearValsIdxs = res.nearValuesIndexes();
+ List<Integer> skipped = res.skippedIndexes();
+
+ GridCacheVersion ver = res.nearVersion();
+
+ assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
+
+ int nearValIdx = 0;
+
+ String taskName = cctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+
+ for (int i = 0; i < req.size(); i++) {
+ if (F.contains(skipped, i))
+ continue;
+
+ KeyCacheObject key = req.key(i);
+
+ if (F.contains(failed, key))
+ continue;
+
+ if (cctx.affinity().partitionBelongs(cctx.localNode(), cctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup.
+ GridCacheEntryEx entry = nearCache.peekEx(key);
+
+ if (entry != null && entry.markObsolete(ver))
+ nearCache.removeEntry(entry);
+
+ continue;
+ }
+
+ CacheObject val = null;
+
+ if (F.contains(nearValsIdxs, i)) {
+ val = res.nearValue(nearValIdx);
+
+ nearValIdx++;
+ }
+ else {
+ assert req.operation() != TRANSFORM;
+
+ if (req.operation() != DELETE)
+ val = req.value(i);
+ }
+
+ long ttl = res.nearTtl(i);
+ long expireTime = res.nearExpireTime(i);
+
+ if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE)
+ expireTime = CU.toExpireTime(ttl);
+
+ try {
+ processNearAtomicUpdateResponse(
+ nearCache,
+ topVer,
+ ver,
+ key,
+ val,
+ ttl,
+ expireTime,
+ req.keepBinary(),
+ req.nodeId(),
+ req.subjectId(),
+ taskName,
+ req.operation() == TRANSFORM);
+ }
+ catch (IgniteCheckedException e) {
+ res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e));
+ }
+ }
+ }
+
+ /**
+ * @param nearCache Near cache.
+ * @param topVer Update topology version.
+ * @param ver Version.
+ * @param key Key.
+ * @param val Value.
+ * @param ttl TTL.
+ * @param expireTime Expire time.
+ * @param keepBinary Keep binary flag.
+ * @param nodeId Node ID.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param transformedValue {@code True} if transformed value.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void processNearAtomicUpdateResponse(
+ GridNearAtomicCache nearCache,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion ver,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ long ttl,
+ long expireTime,
+ boolean keepBinary,
+ UUID nodeId,
+ UUID subjId,
+ String taskName,
+ boolean transformedValue) throws IgniteCheckedException {
+ try {
+ while (true) {
+ GridNearCacheEntry entry = null;
+
+ try {
+ entry = nearCache.entryExx(key, topVer);
+
+ GridCacheOperation op = val != null ? UPDATE : DELETE;
+
+ GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+ ver,
+ nodeId,
+ nodeId,
+ op,
+ val,
+ null,
+ /*write-through*/false,
+ /*read-through*/false,
+ /*retval*/false,
+ keepBinary,
+ /*expiry policy*/null,
+ /*event*/true,
+ /*metrics*/true,
+ /*primary*/false,
+ /*check version*/true,
+ topVer,
+ CU.empty0(),
+ DR_NONE,
+ ttl,
+ expireTime,
+ null,
+ false,
+ false,
+ subjId,
+ taskName,
+ null,
+ null,
+ null,
+ transformedValue);
+
+ boolean release;
+
+ synchronized (this) {
+ GridNearCacheEntry reserved = reservedEntries.remove(key);
+
+ assert reserved == null || reserved == entry;
+
+ release = reserved != null;
+ }
+
+ if (release)
+ entry.releaseEviction();
+
+ if (updRes.removeVersion() != null)
+ nearCache.context().onDeferredDelete(entry, updRes.removeVersion());
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry while updating near cache value (will retry): " + key);
+
+ entry = null;
+ }
+ finally {
+ if (entry != null)
+ entry.touch(topVer);
+ }
+ }
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ // Ignore.
+ }
+ }
+
+ /**
*
*/
static class NodeResult {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 4c0d2db..0d12dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -17,12 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -39,7 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -48,6 +42,13 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -124,6 +125,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
this.key = key;
this.val = val;
+
+ reservedEntries = new GridLeanMap<>(1);
}
/** {@inheritDoc} */
@@ -375,7 +378,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology();
+ mapOnTopology(true);
}
});
}
@@ -394,13 +397,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (res.remapTopologyVersion() != null)
return;
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
+ processNearAtomicUpdateResponse(req, res);
}
/** {@inheritDoc} */
- @Override protected void mapOnTopology() {
+ @Override protected void mapOnTopology(boolean remap) {
AffinityTopologyVersion topVer;
if (cache.topology().stopping()) {
@@ -431,7 +432,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology();
+ mapOnTopology(remap);
}
});
}
@@ -440,11 +441,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
return;
}
- map(topVer);
+ map(topVer, remap);
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer) {
+ @Override protected void map(AffinityTopologyVersion topVer, boolean remap) {
long futId = cctx.mvcc().nextAtomicId();
Exception err = null;
@@ -479,6 +480,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
return;
}
+ if (!remap && nearEnabled)
+ reserveNearCacheEntry(reqState0.req.key(0), topVer);
+
// Optimize mapping for single key.
sendSingleRequest(reqState0.req.nodeId(), reqState0.req);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 28ebfb1..dc98854 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -17,15 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -43,7 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -56,6 +47,16 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
@@ -162,6 +163,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
this.vals = vals;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
+
+ reservedEntries = U.newHashMap(keys.size());
}
/** {@inheritDoc} */
@@ -473,6 +476,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
completeFuture(opRes0, err0, res.futureId());
}
+ /**
+ * @param remapTopVer New topology version.
+ */
private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
assert remapTopVer != null;
@@ -503,7 +509,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology();
+ mapOnTopology(true);
}
});
}
@@ -617,13 +623,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (res.remapTopologyVersion() != null)
return;
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
+ processNearAtomicUpdateResponse(req, res);
}
/** {@inheritDoc} */
- @Override protected void mapOnTopology() {
+ @Override protected void mapOnTopology(boolean remap) {
AffinityTopologyVersion topVer;
if (cache.topology().stopping()) {
@@ -652,7 +656,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology();
+ mapOnTopology(remap);
}
});
}
@@ -661,7 +665,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
- map(topVer, remapKeys);
+ map(topVer, remap, remapKeys);
}
/**
@@ -725,15 +729,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer) {
- map(topVer, null);
+ @Override protected void map(AffinityTopologyVersion topVer, boolean remap) {
+ map(topVer, remap, null);
}
/**
* @param topVer Topology version.
+ * @param remap Remap flag.
* @param remapKeys Keys to remap.
*/
- private void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ private void map(AffinityTopologyVersion topVer, boolean remap, @Nullable Collection<KeyCacheObject> remapKeys) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
@@ -758,12 +763,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (size == 1) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown);
+ singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown, remap);
}
else {
- Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
+ Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(
+ topNodes,
topVer,
futId,
+ remap,
remapKeys,
mappingKnown);
@@ -911,14 +918,17 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @param topNodes Cache nodes.
* @param topVer Topology version.
* @param futId Future ID.
+ * @param remap Remap flag.
* @param remapKeys Keys to remap.
* @return Mapping.
* @throws Exception If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
+ private Map<UUID, PrimaryRequestState> mapUpdate(
+ Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
Long futId,
+ boolean remap,
@Nullable Collection<KeyCacheObject> remapKeys,
boolean mappingKnown) throws Exception {
Iterator<?> it = null;
@@ -1044,6 +1054,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
mapped.addMapping(nodes);
mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
+
+ if (!remap && nearEnabled)
+ reserveNearCacheEntry(cacheKey, topVer);
}
return pendingMappings;
@@ -1053,10 +1066,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @param topVer Topology version.
* @param futId Future ID.
* @param mappingKnown {@code True} if update mapping is known locally.
+ * @param remap Remap flag.
* @return Request.
* @throws Exception If failed.
*/
- private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+ private PrimaryRequestState mapSingleUpdate(
+ AffinityTopologyVersion topVer,
+ Long futId,
+ boolean mappingKnown,
+ boolean remap)
throws Exception {
Object key = F.first(keys);
@@ -1155,6 +1173,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
conflictExpireTime,
conflictVer);
+ if (!remap && nearEnabled)
+ reserveNearCacheEntry(cacheKey, topVer);
+
return new PrimaryRequestState(req, nodes, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 503c324..d29eb02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -17,20 +17,8 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
-import java.io.Externalizable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
@@ -41,12 +29,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -61,6 +46,18 @@ import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import java.io.Externalizable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -123,177 +120,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/**
- * @param req Update request.
- * @param res Update response.
- */
- public void processNearAtomicUpdateResponse(
- GridNearAtomicAbstractUpdateRequest req,
- GridNearAtomicUpdateResponse res
- ) {
- if (F.size(res.failedKeys()) == req.size())
- return;
-
- /*
- * Choose value to be stored in near cache: first check key is not in failed and not in skipped list,
- * then check if value was generated on primary node, if not then use value sent in request.
- */
-
- Collection<KeyCacheObject> failed = res.failedKeys();
- List<Integer> nearValsIdxs = res.nearValuesIndexes();
- List<Integer> skipped = res.skippedIndexes();
-
- GridCacheVersion ver = res.nearVersion();
-
- assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
-
- int nearValIdx = 0;
-
- String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
-
- for (int i = 0; i < req.size(); i++) {
- if (F.contains(skipped, i))
- continue;
-
- KeyCacheObject key = req.key(i);
-
- if (F.contains(failed, key))
- continue;
-
- if (ctx.affinity().partitionBelongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup.
- GridCacheEntryEx entry = peekEx(key);
-
- if (entry != null && entry.markObsolete(ver))
- removeEntry(entry);
-
- continue;
- }
-
- CacheObject val = null;
-
- if (F.contains(nearValsIdxs, i)) {
- val = res.nearValue(nearValIdx);
-
- nearValIdx++;
- }
- else {
- assert req.operation() != TRANSFORM;
-
- if (req.operation() != DELETE)
- val = req.value(i);
- }
-
- long ttl = res.nearTtl(i);
- long expireTime = res.nearExpireTime(i);
-
- if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE)
- expireTime = CU.toExpireTime(ttl);
-
- try {
- processNearAtomicUpdateResponse(ver,
- key,
- val,
- ttl,
- expireTime,
- req.keepBinary(),
- req.nodeId(),
- req.subjectId(),
- taskName,
- req.operation() == TRANSFORM);
- }
- catch (IgniteCheckedException e) {
- res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e));
- }
- }
- }
-
- /**
- * @param ver Version.
- * @param key Key.
- * @param val Value.
- * @param ttl TTL.
- * @param expireTime Expire time.
- * @param nodeId Node ID.
- * @param subjId Subject ID.
- * @param taskName Task name.
- * @param transformedValue {@code True} if transformed value.
- * @throws IgniteCheckedException If failed.
- */
- private void processNearAtomicUpdateResponse(
- GridCacheVersion ver,
- KeyCacheObject key,
- @Nullable CacheObject val,
- long ttl,
- long expireTime,
- boolean keepBinary,
- UUID nodeId,
- UUID subjId,
- String taskName,
- boolean transformedValue) throws IgniteCheckedException {
- try {
- while (true) {
- GridCacheEntryEx entry = null;
-
- AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
- try {
- entry = entryEx(key, topVer);
-
- GridCacheOperation op = val != null ? UPDATE : DELETE;
-
- GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
- ver,
- nodeId,
- nodeId,
- op,
- val,
- null,
- /*write-through*/false,
- /*read-through*/false,
- /*retval*/false,
- keepBinary,
- /*expiry policy*/null,
- /*event*/true,
- /*metrics*/true,
- /*primary*/false,
- /*check version*/true,
- topVer,
- CU.empty0(),
- DR_NONE,
- ttl,
- expireTime,
- null,
- false,
- false,
- subjId,
- taskName,
- null,
- null,
- null,
- transformedValue);
-
- if (updRes.removeVersion() != null)
- ctx.onDeferredDelete(entry, updRes.removeVersion());
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry while updating near cache value (will retry): " + key);
-
- entry = null;
- }
- finally {
- if (entry != null)
- entry.touch(topVer);
- }
- }
- }
- catch (GridDhtInvalidPartitionException ignored) {
- // Ignore.
- }
- }
-
- /**
* @param nodeId Sender node ID.
* @param req Dht atomic update request.
* @param res Dht atomic update response.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index c953beb..f6059d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -717,7 +717,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
/**
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- void reserveEviction() throws GridCacheEntryRemovedException {
+ public void reserveEviction() throws GridCacheEntryRemovedException {
lockEntry();
try {
@@ -733,7 +733,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
/**
*
*/
- void releaseEviction() {
+ public void releaseEviction() {
lockEntry();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
index 19f98ff..462ca31 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
@@ -220,9 +220,6 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach
* @throws Exception If failed.
*/
public void testPutConsistencyMultithreaded() throws Exception {
- if (nearEnabled())
- fail("https://issues.apache.org/jira/browse/IGNITE-627");
-
for (int i = 0; i < 20; i++) {
log.info("Iteration: " + i);
@@ -273,9 +270,6 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach
* @throws Exception If failed.
*/
public void testPutRemoveConsistencyMultithreaded() throws Exception {
- if (nearEnabled())
- fail("https://issues.apache.org/jira/browse/IGNITE-627");
-
for (int i = 0; i < 10; i++) {
log.info("Iteration: " + i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7aa24987/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 14c8571..1c6b8cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
@@ -876,6 +877,49 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testNearEntryUpdateRace() throws Exception {
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ client = false;
+
+ Ignite srv0 = startGrid(0);
+
+ IgniteCache<Object, Object> srvCache = srv0.cache(TEST_CACHE);
+
+ int key = 0;
+
+ ccfg = null;
+
+ client = true;
+
+ Ignite client1 = startGrid(1);
+
+ IgniteCache<Object, Object> nearCache = client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>());
+
+ testSpi(srv0).blockMessages(GridNearAtomicUpdateResponse.class, client1.name());
+
+ IgniteInternalFuture<?> nearPutFut = GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ nearCache.put(key, 1);
+ }
+ });
+
+ testSpi(srv0).waitForBlocked();
+
+ srvCache.put(key, 2);
+
+ assertFalse(nearPutFut.isDone());
+
+ testSpi(srv0).stopBlock();
+
+ nearPutFut.get();
+
+ assertEquals(2, nearCache.get(key));
+ }
+
+ /**
* @param expData Expected cache data.
*/
private void checkData(Map<Integer, Integer> expData) {