You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/14 14:43:12 UTC
[5/7] ignite git commit: Merge master into ignite-3477
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1937a5f,8c3c5d1..fc1d9e3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@@ -768,108 -765,92 +770,108 @@@ public abstract class GridCacheAdapter<
PeekModes modes = parsePeekModes(peekModes, false);
- try {
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+ KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+
+ CacheObject cacheVal = null;
- CacheObject cacheVal = null;
+ if (!ctx.isLocal()) {
+ AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
- if (!ctx.isLocal()) {
- AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+ int part = ctx.affinity().partition(cacheKey);
- int part = ctx.affinity().partition(cacheKey);
+ boolean nearKey;
- boolean nearKey;
+ if (!(modes.near && modes.primary && modes.backup)) {
- boolean keyPrimary = ctx.affinity().primary(ctx.localNode(), part, topVer);
++ boolean keyPrimary = ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer);
+
+ if (keyPrimary) {
+ if (!modes.primary)
+ return null;
- if (!(modes.near && modes.primary && modes.backup)) {
- boolean keyPrimary = ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer);
+ nearKey = false;
+ }
+ else {
- boolean keyBackup = ctx.affinity().belongs(ctx.localNode(), part, topVer);
++ boolean keyBackup = ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer);
- if (keyPrimary) {
- if (!modes.primary)
+ if (keyBackup) {
+ if (!modes.backup)
return null;
nearKey = false;
}
else {
- boolean keyBackup = ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer);
-
- if (keyBackup) {
- if (!modes.backup)
- return null;
-
- nearKey = false;
- }
- else {
- if (!modes.near)
- return null;
-
- nearKey = true;
+ if (!modes.near)
+ return null;
- // Swap and offheap are disabled for near cache.
- modes.offheap = false;
- modes.swap = false;
- }
- }
- }
- else {
- nearKey = !ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer);
+ nearKey = true;
- if (nearKey) {
// Swap and offheap are disabled for near cache.
modes.offheap = false;
- modes.swap = false;
}
}
+ }
+ else {
- nearKey = !ctx.affinity().belongs(ctx.localNode(), part, topVer);
++ nearKey = !ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer);
- if (nearKey && !ctx.isNear())
- return null;
+ if (nearKey) {
+ // Swap and offheap are disabled for near cache.
+ modes.offheap = false;
+ }
+ }
- if (modes.heap) {
- GridCacheEntryEx e = nearKey ? peekEx(cacheKey) :
- (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
+ if (nearKey && !ctx.isNear())
+ return null;
- if (e != null) {
- cacheVal = e.peek(modes.heap, modes.offheap, modes.swap, topVer, plc);
+ GridCacheEntryEx e;
+ GridCacheContext ctx0;
- modes.offheap = false;
- modes.swap = false;
- }
+ while (true) {
+ if (nearKey) {
+ ctx0 = context();
+ e = peekEx(key);
+ }
+ else {
+ ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
+ e = modes.offheap ? ctx0.cache().entryEx(key) : ctx0.cache().peekEx(key);
}
- if (modes.offheap || modes.swap) {
- GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
+ if (e != null) {
+ try {
+ cacheVal = e.peek(modes.heap, modes.offheap, topVer, plc);
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry during 'peek': " + key);
- cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap);
+ continue;
+ }
+ finally {
+ ctx0.evicts().touch(e, null);
+ }
}
+
+ break;
}
- else
- cacheVal = localCachePeek0(cacheKey, modes.heap, modes.offheap, modes.swap, plc);
+ }
+ else {
+ while (true) {
+ try {
+ cacheVal = localCachePeek0(cacheKey, modes.heap, modes.offheap, plc);
- Object val = ctx.unwrapBinaryIfNeeded(cacheVal, ctx.keepBinary(), false);
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry during 'peek': " + key);
- return (V)val;
+ // continue
+ }
+ }
}
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry during 'peek': " + key);
- return null;
- }
+ Object val = ctx.unwrapBinaryIfNeeded(cacheVal, ctx.keepBinary(), false);
+
+ return (V)val;
}
/**
@@@ -1520,31 -1482,31 +1523,31 @@@
final boolean intercept = ctx.config().getInterceptor() != null;
IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain(
- new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() {
- @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f)
+ new CX1<IgniteInternalFuture<EntryGetResult>, CacheEntry<K, V>>() {
+ @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<EntryGetResult> f)
throws IgniteCheckedException {
- T2<V, GridCacheVersion> t = f.get();
+ EntryGetResult t = f.get();
- K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;
+ K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;
- CacheEntry val = t != null ? new CacheEntryImplEx<>(
- key,
- t.value(),
- t.version())
- : null;
+ CacheEntry val = t != null ? new CacheEntryImplEx<>(
+ key,
- t.get1(),
- t.get2())
++ t.value(),
++ t.version())
+ : null;
- if (intercept) {
- V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
+ if (intercept) {
+ V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
- return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.get2() : null) : null;
- return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null;
++ return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null;
+ }
+ else
+ return val;
}
- else
- return val;
- }
- });
+ });
if (statsEnabled)
- fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start));
+ fut.listen(new UpdateGetTimeStatClosure<EntryGetResult>(metrics0(), start));
return fr;
}
@@@ -1639,22 -1587,9 +1642,22 @@@
final long start = statsEnabled ? System.nanoTime() : 0L;
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ String taskName = ctx.kernalContext().job().currentTaskName();
+
- IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> fut =
- (IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>)((IgniteInternalFuture)getAllAsync(
+ IgniteInternalFuture<Map<K, EntryGetResult>> fut =
- (IgniteInternalFuture<Map<K, EntryGetResult>>)
- ((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true));
++ (IgniteInternalFuture<Map<K, EntryGetResult>>)((IgniteInternalFuture)getAllAsync(
+ keys,
+ !ctx.config().isReadFromBackup(),
+ /*skip tx*/false,
+ opCtx != null ? opCtx.subjectId() : null,
+ taskName,
+ !(opCtx != null && opCtx.isKeepBinary()),
+ opCtx != null && opCtx.recovery(),
+ /*skip vals*/false,
+ /*can remap*/true,
+ /*need ver*/true));
final boolean intercept = ctx.config().getInterceptor() != null;
@@@ -1732,7 -1667,9 +1735,7 @@@
*/
@SuppressWarnings("IfMayBeConditional")
private Collection<CacheEntry<K, V>> interceptGetEntries(
- @Nullable Collection<? extends K> keys, Map<K, T2<V, GridCacheVersion>> map) {
+ @Nullable Collection<? extends K> keys, Map<K, EntryGetResult> map) {
- Map<K, CacheEntry<K, V>> res;
-
if (F.isEmpty(keys)) {
assert map.isEmpty();
@@@ -1957,23 -1890,14 +1966,23 @@@
}
if (tx == null || tx.implicit()) {
- Map<KeyCacheObject, GridCacheVersion> misses = null;
- try {
- final AffinityTopologyVersion topVer = tx == null ?
- (canRemap ?
- ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
- tx.topologyVersion();
++ Map<KeyCacheObject, EntryGetResult> misses = null;
+ final AffinityTopologyVersion topVer = tx == null ?
+ (canRemap ?
+ ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
+
+ try {
int keysSize = keys.size();
+ GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture();
+
+ Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null;
+
+ if (ex != null)
+ return new GridFinishedFuture<>(ex);
+
final Map<K1, V1> map = keysSize == 1 ?
(Map<K1, V1>)new IgniteBiTuple<>() :
U.<K1, V1>newHashMap(keysSize);
@@@ -1992,33 -1920,55 +2001,53 @@@
}
try {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
- null,
- null,
- /*update-metrics*/!skipVals,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
-
- if (res == null) {
- if (storeEnabled) {
- GridCacheVersion ver = entry.version();
+ EntryGetResult res;
+
+ boolean evt = !skipVals;
+ boolean updateMetrics = !skipVals;
+
+ if (storeEnabled) {
- res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
- updateMetrics,
++ res = entry.innerGetAndReserveForLoad(updateMetrics,
+ evt,
+ subjId,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ readerArgs);
+
+ assert res != null;
+ if (res.value() == null) {
if (misses == null)
misses = new HashMap<>();
- misses.put(key, ver);
+ misses.put(key, res);
+
+ res = null;
}
- else
- ctx.evicts().touch(entry, topVer);
}
else {
+ res = entry.innerGetVersioned(
+ null,
+ null,
- ctx.isSwapOrOffheapEnabled(),
+ /*unmarshal*/true,
+ updateMetrics,
+ evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ readerArgs);
+
+ if (res == null)
+ ctx.evicts().touch(entry, topVer);
+ }
+
+ if (res != null) {
ctx.addResult(map,
key,
- res.get1(),
+ res,
skipVals,
keepCacheObjects,
deserializeBinary,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 26e4ed3,f47e9f3..8e6a9ec
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@@ -305,9 -324,11 +306,9 @@@ public interface GridCacheEntryEx
* @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
- boolean readSwap,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
UUID subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d28ea25,2237e22..846c633
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -103,6 -99,15 +102,9 @@@ public abstract class GridCacheMapEntr
private static final byte IS_UNSWAPPED_MASK = 0x02;
/** */
- private static final byte IS_OFFHEAP_PTR_MASK = 0x04;
-
- /** */
- private static final byte IS_SWAPPING_REQUIRED = 0x08;
-
- /** */
- private static final byte IS_EVICT_DISABLED = 0x10;
++ private static final byte IS_EVICT_DISABLED = 0x04;
+
+ /** */
public static final GridCacheAtomicVersionComparator ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator();
/**
@@@ -479,24 -776,56 +481,51 @@@
taskName,
expirePlc,
false,
- keepBinary);
+ keepBinary,
+ false,
+ null);
+ }
+
+ /** {@inheritDoc} */
- @Override public EntryGetResult innerGetAndReserveForLoad(boolean readSwap,
- boolean updateMetrics,
++ @Override public EntryGetResult innerGetAndReserveForLoad(boolean updateMetrics,
+ boolean evt,
+ UUID subjId,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean keepBinary,
+ @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ return (EntryGetResult)innerGet0(
+ /*ver*/null,
+ /*tx*/null,
- readSwap,
+ /*readThrough*/false,
+ evt,
+ updateMetrics,
- /*tmp*/false,
+ subjId,
+ /*transformClo*/null,
+ taskName,
+ expiryPlc,
+ true,
+ keepBinary,
+ /*reserve*/true,
+ readerArgs);
}
/** {@inheritDoc} */
- @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ @Override public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
- boolean readSwap,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
UUID subjId,
Object transformClo,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary)
+ boolean keepBinary,
+ @Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException {
- return (T2<CacheObject, GridCacheVersion>)innerGet0(
- return (EntryGetResult)innerGet0(ver,
++ return (EntryGetResult)innerGet0(
+ ver,
tx,
- readSwap,
false,
evt,
updateMetrics,
@@@ -624,14 -965,28 +658,27 @@@
// Cache version for optimistic check.
startVer = ver;
- }
- if (ret != null) {
- assert !obsolete;
- assert !deferred;
+ addReaderIfNeed(readerArgs);
+
+ if (ret != null) {
- assert tmp || !(ret instanceof BinaryObjectOffheapImpl);
+ assert !obsolete;
+ assert !deferred;
+
+ // If return value is consistent, then done.
+ res = retVer ? entryGetResult(ret, resVer, false) : ret;
+ }
+ else if (reserveForLoad && !obsolete) {
+ assert !readThrough;
+ assert retVer;
+
+ boolean reserve = !evictionDisabled();
+
+ if (reserve)
+ flags |= IS_EVICT_DISABLED;
- // If return value is consistent, then done.
- return retVer ? new T2<>(ret, resVer) : ret;
+ res = entryGetResult(null, resVer, reserve);
+ }
}
if (obsolete) {
@@@ -682,8 -1046,20 +734,10 @@@
update(ret, expTime, ttl, nextVer, true);
- if (hadValPtr && cctx.offheapTiered()) {
- if (log.isTraceEnabled()) {
- log.trace("innerGet removeOffheap [key=" + key +
- ", entry=" + System.identityHashCode(this) +
- ", ptr=" + offHeapPointer() + ']');
- }
-
- cctx.swap().removeOffheap(key);
- }
-
if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
deletedUnlocked(false);
+
+ assert readerArgs == null;
}
if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
@@@ -2170,8 -3077,21 +2238,9 @@@
value(null);
ver = newVer;
+ flags &= ~IS_EVICT_DISABLED;
- if (log.isTraceEnabled()) {
- log.trace("invalidate releaseSwap [key=" + key +
- ", entry=" + System.identityHashCode(this) +
- ", val=" + val +
- ", ptr=" + offHeapPointer() +
- ']');
- }
-
- releaseSwap();
-
- clearIndex(val);
+ removeValue();
onInvalidate();
}
@@@ -2249,9 -3168,10 +2318,10 @@@
ttlAndExpireTimeExtras(ttl, expireTime);
this.ver = ver;
+ flags &= ~IS_EVICT_DISABLED;
- if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
- cctx.ttl().addTrackedEntry(this);
+ if (trackNear && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()))
+ cctx.ttl().addTrackedEntry((GridNearCacheEntry)this);
}
/**
@@@ -2421,18 -3394,8 +2491,18 @@@
}
}
+ /** {@inheritDoc} */
+ @Nullable @Override public CacheObject peek(@Nullable IgniteCacheExpiryPolicy plc)
+ throws GridCacheEntryRemovedException, IgniteCheckedException {
+ IgniteInternalTx tx = cctx.tm().localTxx();
+
+ AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion();
+
+ return peek(true, false, topVer, plc);
+ }
+
/**
- * TODO: GG-4009: do we need to generate event and invalidate value?
+ * TODO: IGNITE-3500: do we need to generate event and invalidate value?
*
* @return {@code true} if expired.
* @throws IgniteCheckedException In case of failure.
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 46a20ab,86dd4ea..f339f46
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -57,8 -54,8 +56,9 @@@ import org.apache.ignite.internal.Ignit
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
- import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+ import org.apache.ignite.internal.managers.discovery.DiscoCache;
+ import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
@@@ -222,16 -210,19 +220,18 @@@ public class GridCachePartitionExchange
}
}
- assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() :
- assert
- evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
++ assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
"Node joined with smaller-than-local " +
-- "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
++ "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
- exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+ exchId = exchangeId(n.id(),
+ affinityTopologyVersion(evt),
+ evt.type());
- exchFut = exchangeFuture(exchId, e, null, null);
+ exchFut = exchangeFuture(exchId, evt, cache,null, null);
}
else {
- DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
+ DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
@@@ -260,11 -251,10 +260,10 @@@
}
}
- //todo think about refactoring
- if (!F.isEmpty(valid)) {
+ if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, e, valid, null);
+ exchFut = exchangeFuture(exchId, evt, cache, valid, null);
}
}
else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
@@@ -278,14 -268,8 +277,14 @@@
}
}
else
- exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+ exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
}
+ else if (customEvt.customMessage() instanceof StartFullSnapshotAckDiscoveryMessage
+ && !((StartFullSnapshotAckDiscoveryMessage)customEvt.customMessage()).hasError()) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
++ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
- exchFut = exchangeFuture(exchId, e, null, null);
++ exchFut = exchangeFuture(exchId, evt, null, null, null);
+ }
}
if (exchId != null) {
@@@ -1303,8 -1291,11 +1307,12 @@@
}
else {
if (msg.client()) {
- final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+ final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(
- msg.exchangeId(), null, null, null);
++ msg.exchangeId(),
+ null,
+ null,
+ null,
+ null);
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 987d696,b016883..c18dbcf
mode 100644,100755..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -880,48 -857,33 +891,59 @@@ public class GridCacheProcessor extend
assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started";
assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
+
+ if (!ctx.clientNode() && !ctx.isDaemon())
+ addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
+
+ }
+
+ /**
+ * @param timeout Cleanup timeout.
+ */
+ private void addRemovedItemsCleanupTask(long timeout) {
+ ctx.timeout().addTimeoutObject(new RemovedItemsCleanupTask(timeout));
}
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- for (String cacheName : stopSeq) {
- GridCacheAdapter<?, ?> cache = stoppedCaches.remove(maskNull(cacheName));
+ /**
+ *
+ */
+ private void checkConsistency() throws IgniteCheckedException {
+ if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+ for (ClusterNode n : ctx.discovery().remoteNodes()) {
+ if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
+ continue;
- if (cache != null)
- stopCache(cache, cancel);
- }
+ checkTransactionConfiguration(n);
- for (GridCacheAdapter<?, ?> cache : stoppedCaches.values()) {
- if (cache == stoppedCaches.remove(maskNull(cache.name())))
- stopCache(cache, cancel);
+ DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+ DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+
+ CU.checkAttributeMismatch(
+ log, null, n.id(), "deploymentMode", "Deployment mode",
+ locDepMode, rmtDepMode, true);
+
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
+
+ if (rmtCfg != null) {
+ CacheConfiguration locCfg = desc.cacheConfiguration();
+
+ checkCache(locCfg, rmtCfg, n);
+
+ // Check plugin cache configurations.
+ CachePluginManager pluginMgr = desc.pluginManager();
+
+ pluginMgr.validateRemotes(rmtCfg, n);
+ }
+ }
+ }
}
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ stopCaches(cancel);
List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index ca71f51,9c4e4ef..eb23c43
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@@ -30,8 -30,8 +30,9 @@@ import java.util.concurrent.locks.Reent
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+ import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@@ -41,10 -41,7 +42,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
- import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index fba1877,58dbbcf..ac6eee3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@@ -650,10 -657,10 +654,11 @@@ public abstract class GridDhtCacheAdapt
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ boolean recovery
) {
return getAllAsync0(keys,
+ readerArgs,
readThrough,
/*don't check local tx. */false,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 2c3435d,d0d801a..5fe0ef4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@@ -392,32 -390,22 +395,23 @@@ public final class GridDhtGetFuture<K,
txFut.markInitialized();
}
- IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
+ IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut;
if (txFut == null || txFut.isDone()) {
- if (tx == null) {
- fut = cache().getDhtAllAsync(
- keys.keySet(),
- readThrough,
- subjId,
- taskName,
- expiryPlc,
- skipVals,
- /*can remap*/true,
- recovery);
- }
- else {
- fut = tx.getAllAsync(cctx,
- null,
- keys.keySet(),
- /*deserialize binary*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough,
- false);
- }
+ fut = cache().getDhtAllAsync(
+ keys.keySet(),
+ readerArgs,
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
- /*can remap*/true);
++ /*can remap*/true,
++ recovery);
}
else {
+ final ReaderArguments args = readerArgs;
+
// If we are here, then there were active transactions for some entries
// when we were adding the reader. In that case we must wait for those
// transactions to complete.
@@@ -428,27 -416,15 +422,16 @@@
if (e != null)
throw new GridClosureException(e);
- if (tx == null) {
- return cache().getDhtAllAsync(
- keys.keySet(),
- readThrough,
- subjId,
- taskName,
- expiryPlc,
- skipVals,
- /*can remap*/true,
- recovery);
- }
- else {
- return tx.getAllAsync(cctx,
- null,
- keys.keySet(),
- /*deserialize binary*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough,
- false);
- }
+ return cache().getDhtAllAsync(
+ keys.keySet(),
+ args,
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
- /*can remap*/true);
++ /*can remap*/true,
++ recovery);
}
}
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 81d2570,33f4661..9cc69b5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@@ -350,32 -348,22 +353,23 @@@ public final class GridDhtGetSingleFutu
}
}
- IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
+ IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut;
if (rdrFut == null || rdrFut.isDone()) {
- if (tx == null) {
- fut = cache().getDhtAllAsync(
- Collections.singleton(key),
- readThrough,
- subjId,
- taskName,
- expiryPlc,
- skipVals,
- /*can remap*/true,
- recovery);
- }
- else {
- fut = tx.getAllAsync(cctx,
- null,
- Collections.singleton(key),
- /*deserialize binary*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough,
- false);
- }
+ fut = cache().getDhtAllAsync(
+ Collections.singleton(key),
+ readerArgs,
+ readThrough,
+ subjId,
+ taskName,
+ expiryPlc,
+ skipVals,
- /*can remap*/true);
++ /*can remap*/true,
++ recovery);
}
else {
+ final ReaderArguments args = readerArgs;
+
rdrFut.listen(
new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
@@@ -397,20 -384,7 +390,8 @@@
taskName,
expiryPlc,
skipVals,
- /*can remap*/true);
+ /*can remap*/true,
+ recovery);
- }
- else {
- fut0 = tx.getAllAsync(cctx,
- null,
- Collections.singleton(key),
- /*deserialize binary*/false,
- skipVals,
- /*keep cache objects*/true,
- /*skip store*/!readThrough,
- false
- );
- }
fut0.listen(createGetFutureListener());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 9b30593,9f8498a..b1fe6ec
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@@ -49,24 -44,23 +49,28 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridCircularBuffer;
+ import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.T2;
+ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+ import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
+ import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
+import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
@@@ -160,22 -149,9 +169,24 @@@ public class GridDhtLocalPartition impl
int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
- rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
+ rmvQueueMaxSize = U.ceilPow2(delQueueSize);
+
+ rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
+
+ try {
+ store = cctx.offheap().createCacheDataStore(id);
+ }
+ catch (IgniteCheckedException e) {
+ // TODO ignite-db
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @return Data store.
+ */
+ public CacheDataStore dataStore() {
+ return store;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index ac3e2c8,bdd84b0..4f8de4e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@@ -17,10 -17,8 +17,9 @@@
package org.apache.ignite.internal.processors.cache.distributed.dht;
- import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 322bbe3,84ff96b..04188fd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -30,18 -30,15 +30,19 @@@ import java.util.Set
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+ import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@@ -503,86 -468,70 +504,86 @@@ class GridDhtPartitionTopologyImpl impl
ClusterNode loc = cctx.localNode();
- U.writeLock(lock);
+ cctx.shared().database().checkpointReadLock();
- try {
- GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+ synchronized (cctx.shared().exchange().interruptLock()) {
+ if (Thread.currentThread().isInterrupted())
+ throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
- if (stopping)
- return;
+ try {
+ U.writeLock(lock);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ cctx.shared().database().checkpointReadUnlock();
- assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
- topVer + ", exchId=" + exchId + ']';
+ throw e;
+ }
- if (exchId.isLeft())
- removeNode(exchId.nodeId());
+ try {
+ GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ if (stopping)
+ return;
- if (log.isDebugEnabled())
- log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
+ assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
+ topVer + ", exchId=" + exchId + ']';
- long updateSeq = this.updateSeq.incrementAndGet();
+ if (exchId.isLeft())
+ removeNode(exchId.nodeId());
- ClusterNode oldest = currentCoordinator();
- cntrMap.clear();
++ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
- // If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
- if (node2part == null) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
+ if (log.isDebugEnabled())
+ log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
- if (log.isDebugEnabled())
- log.debug("Created brand new full topology map on oldest node [exchId=" +
- exchId + ", fullMap=" + fullMapString() + ']');
- }
- else if (!node2part.valid()) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+ long updateSeq = this.updateSeq.incrementAndGet();
- if (log.isDebugEnabled())
- log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
- node2part + ']');
+ cntrMap.clear();
+
+ // If this is the oldest node.
+ if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+ if (node2part == null) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
+
+ if (log.isDebugEnabled())
+ log.debug("Created brand new full topology map on oldest node [exchId=" +
+ exchId + ", fullMap=" + fullMapString() + ']');
+ }
+ else if (!node2part.valid()) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+
+ if (log.isDebugEnabled())
+ log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
+ node2part + ']');
+ }
+ else if (!node2part.nodeId().equals(loc.id())) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+
+ if (log.isDebugEnabled())
+ log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
+ exchId + ", fullMap=" + fullMapString() + ']');
+ }
}
- else if (!node2part.nodeId().equals(loc.id())) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
- if (log.isDebugEnabled())
- log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
- exchId + ", fullMap=" + fullMapString() + ']');
+ if (affReady)
+ initPartitions0(exchFut, updateSeq);
+ else {
+ List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
+
+ createPartitions(aff, updateSeq);
}
- }
- if (affReady)
- initPartitions0(exchFut, updateSeq);
- else {
- List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
+ consistencyCheck();
- createPartitions(aff, updateSeq);
+ if (log.isDebugEnabled())
+ log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
+ fullMapString() + ']');
}
+ finally {
+ lock.writeLock().unlock();
- consistencyCheck();
-
- if (log.isDebugEnabled())
- log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
- fullMapString() + ']');
- }
- finally {
- lock.writeLock().unlock();
+ cctx.shared().database().checkpointReadUnlock();
+ }
}
// Wait for evictions.
@@@ -761,14 -692,12 +762,14 @@@
try {
loc = locParts.get(p);
+ state = loc != null ? loc.state() : null;
+
- boolean belongs = cctx.affinity().localNode(p, topVer);
+ boolean belongs = cctx.affinity().partitionLocalNode(p, topVer);
- if (loc != null && loc.state() == EVICTED) {
+ if (loc != null && state == EVICTED) {
locParts.set(p, loc = null);
- if (!belongs)
+ if (!treatAllPartAsLoc && !belongs)
throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
"(often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
@@@ -835,9 -750,10 +836,10 @@@
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
- if (part != null)
+ if (part != null && part.state().active())
list.add(part);
}
+
return list;
}
@@@ -1065,11 -981,9 +1067,11 @@@
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(
- @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
++ @Override public GridDhtPartitionMap2 update(
+ @Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
- @Nullable Map<Integer, Long> cntrMap) {
+ @Nullable Map<Integer, T2<Long, Long>> cntrMap
+ ) {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6c4da68,519239a..0ef8bb8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@@ -353,10 -341,10 +354,11 @@@ public class GridPartitionedGetFuture<K
topVer,
subjId,
taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc != null ? expiryPlc.forCreate() : -1L,
expiryPlc != null ? expiryPlc.forAccess() : -1L,
skipVals,
- cctx.deploymentEnabled());
+ cctx.deploymentEnabled(),
+ recovery);
add(fut); // Append new future.
@@@ -461,9 -452,11 +464,9 @@@
GridCacheVersion ver = null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ getRes = entry.innerGetVersioned(
null,
null,
- /*swap*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index ea69743,a3f6b72..0da3a44
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@@ -316,10 -300,10 +317,11 @@@ public class GridPartitionedSingleGetFu
topVer,
subjId,
taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc != null ? expiryPlc.forCreate() : -1L,
expiryPlc != null ? expiryPlc.forAccess() : -1L,
skipVals,
- cctx.deploymentEnabled());
+ cctx.deploymentEnabled(),
+ recovery);
}
try {
@@@ -388,9 -375,11 +390,9 @@@
GridCacheVersion ver = null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
- /*swap*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 72489fd,cebf4ae..ef8150c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -555,11 -547,8 +555,10 @@@ public class GridDhtAtomicCache<K, V> e
/** {@inheritDoc} */
@Override protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer)
throws IgniteCheckedException {
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
return getAllAsyncInternal(keys,
!ctx.config().isReadFromBackup(),
- true,
null,
ctx.kernalContext().job().currentTaskName(),
deserializeBinary,
@@@ -1607,10 -1584,12 +1605,10 @@@
GridCacheVersion ver = null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ getRes = entry.innerGetVersioned(
null,
null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
+ /*update-metrics*/false,
/*event*/!skipVals,
subjId,
null,
@@@ -3209,10 -3186,11 +3212,11 @@@
* @param nodeId Sender node ID.
* @param res Near atomic update response.
*/
- @SuppressWarnings("unchecked")
private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (msgLog.isDebugEnabled())
- msgLog.debug("Received near atomic update response [futId" + res.futureVersion() +
+ msgLog.debug("Received near atomic update response " +
- "[futId=" + res.futureVersion() + ", node=" + nodeId + ']');
++ "[futId=" + res.futureVersion() +
+ ", node=" + nodeId + ']');
res.nodeId(ctx.localNodeId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index d5e8389,e1e0ec2..d86dc91
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -476,9 -473,11 +478,9 @@@ public class GridDhtColocatedCache<K, V
GridCacheVersion ver = null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ getRes = entry.innerGetVersioned(
null,
null,
- /*swap*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 358ec8f,79ca108..31cff03
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -911,37 -911,38 +917,38 @@@ public final class GridDhtColocatedLock
!topLocked &&
(tx == null || !tx.hasRemoteLocks());
- first = false;
- }
-
- req = new GridNearLockRequest(
- cctx.cacheId(),
- topVer,
- cctx.nodeId(),
- threadId,
- futId,
- lockVer,
- inTx(),
- implicitTx(),
- implicitSingleTx(),
- read,
- retval,
- isolation(),
- isInvalidate(),
- timeout,
- mappedKeys.size(),
- inTx() ? tx.size() : mappedKeys.size(),
- inTx() && tx.syncMode() == FULL_SYNC,
- inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0,
- read ? createTtl : -1L,
+ first = false;
+ }
+
+ req = new GridNearLockRequest(
+ cctx.cacheId(),
+ topVer,
+ cctx.nodeId(),
+ threadId,
+ futId,
+ lockVer,
+ inTx(),
+ implicitTx(),
+ implicitSingleTx(),
+ read,
+ retval,
+ isolation(),
+ isInvalidate(),
+ timeout,
+ mappedKeys.size(),
+ inTx() ? tx.size() : mappedKeys.size(),
+ inTx() && tx.syncMode() == FULL_SYNC,
+ inTx() ? tx.subjectId() : null,
+ inTx() ? tx.taskNameHash() : 0,
- read ? accessTtl : -1L,
++ read ? createTtl : -1L,
+ read ? accessTtl : -1L,
- skipStore,
- keepBinary,
- clientFirst,
- cctx.deploymentEnabled());
+ skipStore,
+ keepBinary,
+ clientFirst,
+ cctx.deploymentEnabled());
- mapping.request(req);
- }
+ mapping.request(req);
+ }
distributedKeys.add(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 97d768a,9942423..b80ad04
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@@ -272,24 -288,22 +272,24 @@@ class GridDhtPartitionSupplier
boolean partMissing = false;
if (phase == SupplyContextPhase.NEW)
- phase = SupplyContextPhase.ONHEAP;
+ phase = SupplyContextPhase.OFFHEAP;
+
+ if (phase == SupplyContextPhase.OFFHEAP) {
+ IgniteRebalanceIterator iter;
+
+ if (sctx == null || sctx.entryIt == null) {
+ iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
- if (phase == SupplyContextPhase.ONHEAP) {
- Iterator<GridCacheMapEntry> entIt = sctx != null ?
- (Iterator<GridCacheMapEntry>)sctx.entryIt : loc.allEntries().iterator();
+ if (!iter.historical())
+ s.clean(part);
+ }
+ else
+ iter = (IgniteRebalanceIterator)sctx.entryIt;
- while (entIt.hasNext()) {
+ while (iter.hasNext()) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition, so we send '-1' partition and move on.
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
s.missed(part);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index af0085d,46fb144..c33dc7b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -46,9 -44,7 +46,9 @@@ import org.apache.ignite.internal.Ignit
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
- import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+ import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@@ -488,9 -455,9 +502,11 @@@ public class GridDhtPartitionsExchangeF
assert !dummy && !forcePreload : this;
try {
+ discoCache.updateAlives(cctx.discovery());
+
+ AffinityTopologyVersion topVer = topologyVersion();
+
- srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVer));
+ srvNodes = new ArrayList<>(discoCache.serverNodes());
remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@@ -1324,13 -1118,10 +1340,12 @@@
* Cleans up resources to avoid excessive memory usage.
*/
public void cleanUp() {
- topSnapshot.set(null);
singleMsgs.clear();
fullMsgs.clear();
+ changeGlobalStateExceptions.clear();
crd = null;
partReleaseFut = null;
+ changeGlobalStateE = null;
}
/**
@@@ -1402,11 -1194,10 +1418,13 @@@
if (crd.isLocal()) {
if (remaining.remove(node.id())) {
- updatePartitionSingleMap(node, msg);
+ updateSingleMap = true;
+
+ pendingSingleUpdates++;
+ if (exchangeOnChangeGlobalState && msg.getException() != null)
+ changeGlobalStateExceptions.put(node.id(), msg.getException());
+
allReceived = remaining.isEmpty();
}
}
@@@ -1414,8 -1205,42 +1432,42 @@@
singleMsgs.put(node, msg);
}
- if (allReceived)
+ if (updateSingleMap) {
+ try {
- updatePartitionSingleMap(msg);
++ updatePartitionSingleMap(node, msg);
+ }
+ finally {
+ synchronized (mux) {
+ assert pendingSingleUpdates > 0;
+
+ pendingSingleUpdates--;
+
+ if (pendingSingleUpdates == 0)
+ mux.notifyAll();
+ }
+ }
+ }
+
+ if (allReceived) {
+ awaitSingleMapUpdates();
+
onAllReceived();
+ }
+ }
+
+ /**
+ *
+ */
+ private void awaitSingleMapUpdates() {
+ synchronized (mux) {
+ try {
+ while (pendingSingleUpdates > 0)
+ U.wait(mux);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+ }
+ }
}
/**
@@@ -1541,10 -1272,10 +1593,10 @@@
try {
assert crd.isLocal();
- if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
+ if (!crd.equals(discoCache.serverNodes().get(0))) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal())
- cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff);
+ cacheCtx.topology().beforeExchange(this, !centralizedAff);
}
}
@@@ -1937,10 -1626,9 +1991,12 @@@
}
if (crd0.isLocal()) {
+ if (exchangeOnChangeGlobalState && changeGlobalStateE !=null)
+ changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE);
+
if (allReceived) {
+ awaitSingleMapUpdates();
+
onAllReceived();
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ff4e838,8b74ae6..7cabbd4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@@ -377,10 -374,10 +377,11 @@@ public final class GridNearGetFuture<K
topVer,
subjId,
taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc != null ? expiryPlc.forCreate() : -1L,
expiryPlc != null ? expiryPlc.forAccess() : -1L,
skipVals,
- cctx.deploymentEnabled());
+ cctx.deploymentEnabled(),
+ recovery);
add(fut); // Append new future.
@@@ -441,9 -438,11 +442,9 @@@
// First we peek into near cache.
if (isNear) {
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
- /*swap*/true,
- /*unmarshal*/true,
/**update-metrics*/true,
/*event*/!skipVals,
subjId,
@@@ -577,9 -579,11 +579,9 @@@
boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+ EntryGetResult res = dhtEntry.innerGetVersioned(
null,
null,
- /*swap*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!nearRead && !skipVals,
subjId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index b096d5d,7ca2635..dbf8391
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@@ -137,10 -138,10 +141,11 @@@ public class GridNearGetRequest extend
@NotNull AffinityTopologyVersion topVer,
UUID subjId,
int taskNameHash,
+ long createTtl,
long accessTtl,
boolean skipVals,
- boolean addDepInfo
+ boolean addDepInfo,
+ boolean recovery
) {
assert futId != null;
assert miniId != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------