You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/14 14:43:12 UTC

[5/7] ignite git commit: Merge master into ignite-3477

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1937a5f,8c3c5d1..fc1d9e3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@@ -768,108 -765,92 +770,108 @@@ public abstract class GridCacheAdapter<
  
          PeekModes modes = parsePeekModes(peekModes, false);
  
 -        try {
 -            KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 +        KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 +
 +        CacheObject cacheVal = null;
  
 -            CacheObject cacheVal = null;
 +        if (!ctx.isLocal()) {
 +            AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
  
 -            if (!ctx.isLocal()) {
 -                AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 +            int part = ctx.affinity().partition(cacheKey);
  
 -                int part = ctx.affinity().partition(cacheKey);
 +            boolean nearKey;
  
 -                boolean nearKey;
 +            if (!(modes.near && modes.primary && modes.backup)) {
-                 boolean keyPrimary = ctx.affinity().primary(ctx.localNode(), part, topVer);
++                boolean keyPrimary = ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer);
 +
 +                if (keyPrimary) {
 +                    if (!modes.primary)
 +                        return null;
  
 -                if (!(modes.near && modes.primary && modes.backup)) {
 -                    boolean keyPrimary = ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer);
 +                    nearKey = false;
 +                }
 +                else {
-                     boolean keyBackup = ctx.affinity().belongs(ctx.localNode(), part, topVer);
++                    boolean keyBackup = ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer);
  
 -                    if (keyPrimary) {
 -                        if (!modes.primary)
 +                    if (keyBackup) {
 +                        if (!modes.backup)
                              return null;
  
                          nearKey = false;
                      }
                      else {
 -                        boolean keyBackup = ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer);
 -
 -                        if (keyBackup) {
 -                            if (!modes.backup)
 -                                return null;
 -
 -                            nearKey = false;
 -                        }
 -                        else {
 -                            if (!modes.near)
 -                                return null;
 -
 -                            nearKey = true;
 +                        if (!modes.near)
 +                            return null;
  
 -                            // Swap and offheap are disabled for near cache.
 -                            modes.offheap = false;
 -                            modes.swap = false;
 -                        }
 -                    }
 -                }
 -                else {
 -                    nearKey = !ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer);
 +                        nearKey = true;
  
 -                    if (nearKey) {
                          // Swap and offheap are disabled for near cache.
                          modes.offheap = false;
 -                        modes.swap = false;
                      }
                  }
 +            }
 +            else {
-                 nearKey = !ctx.affinity().belongs(ctx.localNode(), part, topVer);
++                nearKey = !ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer);
  
 -                if (nearKey && !ctx.isNear())
 -                    return null;
 +                if (nearKey) {
 +                    // Swap and offheap are disabled for near cache.
 +                    modes.offheap = false;
 +                }
 +            }
  
 -                if (modes.heap) {
 -                    GridCacheEntryEx e = nearKey ? peekEx(cacheKey) :
 -                        (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
 +            if (nearKey && !ctx.isNear())
 +                return null;
  
 -                    if (e != null) {
 -                        cacheVal = e.peek(modes.heap, modes.offheap, modes.swap, topVer, plc);
 +            GridCacheEntryEx e;
 +            GridCacheContext ctx0;
  
 -                        modes.offheap = false;
 -                        modes.swap = false;
 -                    }
 +            while (true) {
 +                if (nearKey) {
 +                    ctx0 = context();
 +                    e = peekEx(key);
 +                }
 +                else {
 +                    ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
 +                    e = modes.offheap ? ctx0.cache().entryEx(key) : ctx0.cache().peekEx(key);
                  }
  
 -                if (modes.offheap || modes.swap) {
 -                    GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 +                if (e != null) {
 +                    try {
 +                        cacheVal = e.peek(modes.heap, modes.offheap, topVer, plc);
 +                    }
 +                    catch (GridCacheEntryRemovedException ignore) {
 +                        if (log.isDebugEnabled())
 +                            log.debug("Got removed entry during 'peek': " + key);
  
 -                    cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap);
 +                        continue;
 +                    }
 +                    finally {
 +                        ctx0.evicts().touch(e, null);
 +                    }
                  }
 +
 +                break;
              }
 -            else
 -                cacheVal = localCachePeek0(cacheKey, modes.heap, modes.offheap, modes.swap, plc);
 +        }
 +        else {
 +            while (true) {
 +                try {
 +                    cacheVal = localCachePeek0(cacheKey, modes.heap, modes.offheap, plc);
  
 -            Object val = ctx.unwrapBinaryIfNeeded(cacheVal, ctx.keepBinary(), false);
 +                    break;
 +                }
 +                catch (GridCacheEntryRemovedException ignore) {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Got removed entry during 'peek': " + key);
  
 -            return (V)val;
 +                    // continue
 +                }
 +            }
          }
 -        catch (GridCacheEntryRemovedException ignore) {
 -            if (log.isDebugEnabled())
 -                log.debug("Got removed entry during 'peek': " + key);
  
 -            return null;
 -        }
 +        Object val = ctx.unwrapBinaryIfNeeded(cacheVal, ctx.keepBinary(), false);
 +
 +        return (V)val;
      }
  
      /**
@@@ -1520,31 -1482,31 +1523,31 @@@
          final boolean intercept = ctx.config().getInterceptor() != null;
  
          IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain(
-             new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() {
-                 @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f)
+             new CX1<IgniteInternalFuture<EntryGetResult>, CacheEntry<K, V>>() {
+                 @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<EntryGetResult> f)
                      throws IgniteCheckedException {
-                     T2<V, GridCacheVersion> t = f.get();
+                     EntryGetResult t = f.get();
  
 -                K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;
 +                    K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;
  
 -                CacheEntry val = t != null ? new CacheEntryImplEx<>(
 -                    key,
 -                    t.value(),
 -                    t.version())
 -                    : null;
 +                    CacheEntry val = t != null ? new CacheEntryImplEx<>(
 +                        key,
-                         t.get1(),
-                         t.get2())
++                        t.value(),
++                        t.version())
 +                        : null;
  
 -                if (intercept) {
 -                    V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
 +                    if (intercept) {
 +                        V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
  
-                         return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.get2() : null) : null;
 -                    return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null;
++                        return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null;
 +                    }
 +                    else
 +                        return val;
                  }
 -                else
 -                    return val;
 -            }
 -        });
 +            });
  
          if (statsEnabled)
-             fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start));
+             fut.listen(new UpdateGetTimeStatClosure<EntryGetResult>(metrics0(), start));
  
          return fr;
      }
@@@ -1639,22 -1587,9 +1642,22 @@@
  
          final long start = statsEnabled ? System.nanoTime() : 0L;
  
 +        CacheOperationContext opCtx = ctx.operationContextPerCall();
 +
 +        String taskName = ctx.kernalContext().job().currentTaskName();
 +
-         IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> fut =
-             (IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>)((IgniteInternalFuture)getAllAsync(
+         IgniteInternalFuture<Map<K, EntryGetResult>> fut =
 -            (IgniteInternalFuture<Map<K, EntryGetResult>>)
 -                ((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true));
++            (IgniteInternalFuture<Map<K, EntryGetResult>>)((IgniteInternalFuture)getAllAsync(
 +                keys,
 +                !ctx.config().isReadFromBackup(),
 +                /*skip tx*/false,
 +                opCtx != null ? opCtx.subjectId() : null,
 +                taskName,
 +                !(opCtx != null && opCtx.isKeepBinary()),
 +                opCtx != null && opCtx.recovery(),
 +                /*skip vals*/false,
 +                /*can remap*/true,
 +                /*need ver*/true));
  
          final boolean intercept = ctx.config().getInterceptor() != null;
  
@@@ -1732,7 -1667,9 +1735,7 @@@
       */
      @SuppressWarnings("IfMayBeConditional")
      private Collection<CacheEntry<K, V>> interceptGetEntries(
-         @Nullable Collection<? extends K> keys, Map<K, T2<V, GridCacheVersion>> map) {
+         @Nullable Collection<? extends K> keys, Map<K, EntryGetResult> map) {
 -        Map<K, CacheEntry<K, V>> res;
 -
          if (F.isEmpty(keys)) {
              assert map.isEmpty();
  
@@@ -1957,23 -1890,14 +1966,23 @@@
          }
  
          if (tx == null || tx.implicit()) {
-             Map<KeyCacheObject, GridCacheVersion> misses = null;
 -            try {
 -                final AffinityTopologyVersion topVer = tx == null ?
 -                    (canRemap ?
 -                        ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
 -                    tx.topologyVersion();
++            Map<KeyCacheObject, EntryGetResult> misses = null;
  
 +            final AffinityTopologyVersion topVer = tx == null ?
 +                (canRemap ?
 +                    ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
 +                tx.topologyVersion();
 +
 +            try {
                  int keysSize = keys.size();
  
 +                GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture();
 +
 +                Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null;
 +
 +                if (ex != null)
 +                    return new GridFinishedFuture<>(ex);
 +
                  final Map<K1, V1> map = keysSize == 1 ?
                      (Map<K1, V1>)new IgniteBiTuple<>() :
                      U.<K1, V1>newHashMap(keysSize);
@@@ -1992,33 -1920,55 +2001,53 @@@
                          }
  
                          try {
-                             T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
-                                 null,
-                                 null,
-                                 /*update-metrics*/!skipVals,
-                                 /*event*/!skipVals,
-                                 subjId,
-                                 null,
-                                 taskName,
-                                 expiry,
-                                 !deserializeBinary);
- 
-                             if (res == null) {
-                                 if (storeEnabled) {
-                                     GridCacheVersion ver = entry.version();
+                             EntryGetResult res;
+ 
+                             boolean evt = !skipVals;
+                             boolean updateMetrics = !skipVals;
+ 
+                             if (storeEnabled) {
 -                                res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
 -                                    updateMetrics,
++                                res = entry.innerGetAndReserveForLoad(updateMetrics,
+                                     evt,
+                                     subjId,
+                                     taskName,
+                                     expiry,
+                                     !deserializeBinary,
+                                     readerArgs);
+ 
+                                 assert res != null;
  
+                                 if (res.value() == null) {
                                      if (misses == null)
                                          misses = new HashMap<>();
  
-                                     misses.put(key, ver);
+                                     misses.put(key, res);
+ 
+                                     res = null;
                                  }
-                                 else
-                                     ctx.evicts().touch(entry, topVer);
                              }
                              else {
+                                 res = entry.innerGetVersioned(
+                                     null,
+                                     null,
 -                                    ctx.isSwapOrOffheapEnabled(),
+                                     /*unmarshal*/true,
+                                     updateMetrics,
+                                     evt,
+                                     subjId,
+                                     null,
+                                     taskName,
+                                     expiry,
+                                     !deserializeBinary,
+                                     readerArgs);
+ 
+                                 if (res == null)
+                                     ctx.evicts().touch(entry, topVer);
+                             }
+ 
+                             if (res != null) {
                                  ctx.addResult(map,
                                      key,
-                                     res.get1(),
+                                     res,
                                      skipVals,
                                      keepCacheObjects,
                                      deserializeBinary,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 26e4ed3,f47e9f3..8e6a9ec
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@@ -305,9 -324,11 +306,9 @@@ public interface GridCacheEntryEx 
       * @throws IgniteCheckedException If loading value failed.
       * @throws GridCacheEntryRemovedException If entry was removed.
       */
-     @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+     public EntryGetResult innerGetVersioned(
          @Nullable GridCacheVersion ver,
          IgniteInternalTx tx,
 -        boolean readSwap,
 -        boolean unmarshal,
          boolean updateMetrics,
          boolean evt,
          UUID subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d28ea25,2237e22..846c633
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -103,6 -99,15 +102,9 @@@ public abstract class GridCacheMapEntr
      private static final byte IS_UNSWAPPED_MASK = 0x02;
  
      /** */
 -    private static final byte IS_OFFHEAP_PTR_MASK = 0x04;
 -
 -    /** */
 -    private static final byte IS_SWAPPING_REQUIRED = 0x08;
 -
 -    /** */
 -    private static final byte IS_EVICT_DISABLED = 0x10;
++    private static final byte IS_EVICT_DISABLED = 0x04;
+ 
+     /** */
      public static final GridCacheAtomicVersionComparator ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator();
  
      /**
@@@ -479,24 -776,56 +481,51 @@@
              taskName,
              expirePlc,
              false,
-             keepBinary);
+             keepBinary,
+             false,
+             null);
+     }
+ 
+     /** {@inheritDoc} */
 -    @Override public EntryGetResult innerGetAndReserveForLoad(boolean readSwap,
 -        boolean updateMetrics,
++    @Override public EntryGetResult innerGetAndReserveForLoad(boolean updateMetrics,
+         boolean evt,
+         UUID subjId,
+         String taskName,
+         @Nullable IgniteCacheExpiryPolicy expiryPlc,
+         boolean keepBinary,
+         @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException {
+         return (EntryGetResult)innerGet0(
+             /*ver*/null,
+             /*tx*/null,
 -            readSwap,
+             /*readThrough*/false,
+             evt,
+             updateMetrics,
 -            /*tmp*/false,
+             subjId,
+             /*transformClo*/null,
+             taskName,
+             expiryPlc,
+             true,
+             keepBinary,
+             /*reserve*/true,
+             readerArgs);
      }
  
      /** {@inheritDoc} */
-     @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+     @Override public EntryGetResult innerGetVersioned(
          @Nullable GridCacheVersion ver,
          IgniteInternalTx tx,
 -        boolean readSwap,
 -        boolean unmarshal,
          boolean updateMetrics,
          boolean evt,
          UUID subjId,
          Object transformClo,
          String taskName,
          @Nullable IgniteCacheExpiryPolicy expiryPlc,
-         boolean keepBinary)
+         boolean keepBinary,
+         @Nullable ReaderArguments readerArgs)
          throws IgniteCheckedException, GridCacheEntryRemovedException {
-         return (T2<CacheObject, GridCacheVersion>)innerGet0(
 -        return (EntryGetResult)innerGet0(ver,
++        return (EntryGetResult)innerGet0(
 +            ver,
              tx,
 -            readSwap,
              false,
              evt,
              updateMetrics,
@@@ -624,14 -965,28 +658,27 @@@
  
              // Cache version for optimistic check.
              startVer = ver;
-         }
  
-         if (ret != null) {
-             assert !obsolete;
-             assert !deferred;
+             addReaderIfNeed(readerArgs);
+ 
+             if (ret != null) {
 -                assert tmp || !(ret instanceof BinaryObjectOffheapImpl);
+                 assert !obsolete;
+                 assert !deferred;
+ 
+                 // If return value is consistent, then done.
+                 res = retVer ? entryGetResult(ret, resVer, false) : ret;
+             }
+             else if (reserveForLoad && !obsolete) {
+                 assert !readThrough;
+                 assert retVer;
+ 
+                 boolean reserve = !evictionDisabled();
+ 
+                 if (reserve)
+                     flags |= IS_EVICT_DISABLED;
  
-             // If return value is consistent, then done.
-             return retVer ? new T2<>(ret, resVer) : ret;
+                 res = entryGetResult(null, resVer, reserve);
+             }
          }
  
          if (obsolete) {
@@@ -682,8 -1046,20 +734,10 @@@
  
                      update(ret, expTime, ttl, nextVer, true);
  
 -                    if (hadValPtr && cctx.offheapTiered()) {
 -                        if (log.isTraceEnabled()) {
 -                            log.trace("innerGet removeOffheap [key=" + key +
 -                                ", entry=" + System.identityHashCode(this) +
 -                                ", ptr=" + offHeapPointer() + ']');
 -                        }
 -
 -                        cctx.swap().removeOffheap(key);
 -                    }
 -
                      if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
                          deletedUnlocked(false);
+ 
+                     assert readerArgs == null;
                  }
  
                  if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
@@@ -2170,8 -3077,21 +2238,9 @@@
              value(null);
  
              ver = newVer;
+             flags &= ~IS_EVICT_DISABLED;
  
 -            if (log.isTraceEnabled()) {
 -                log.trace("invalidate releaseSwap [key=" + key +
 -                    ", entry=" + System.identityHashCode(this) +
 -                    ", val=" + val +
 -                    ", ptr=" + offHeapPointer() +
 -                    ']');
 -            }
 -
 -            releaseSwap();
 -
 -            clearIndex(val);
 +            removeValue();
  
              onInvalidate();
          }
@@@ -2249,9 -3168,10 +2318,10 @@@
          ttlAndExpireTimeExtras(ttl, expireTime);
  
          this.ver = ver;
+         flags &= ~IS_EVICT_DISABLED;
  
 -        if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
 -            cctx.ttl().addTrackedEntry(this);
 +        if (trackNear && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()))
 +            cctx.ttl().addTrackedEntry((GridNearCacheEntry)this);
      }
  
      /**
@@@ -2421,18 -3394,8 +2491,18 @@@
          }
      }
  
 +    /** {@inheritDoc} */
 +    @Nullable @Override public CacheObject peek(@Nullable IgniteCacheExpiryPolicy plc)
 +        throws GridCacheEntryRemovedException, IgniteCheckedException {
 +        IgniteInternalTx tx = cctx.tm().localTxx();
 +
 +        AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion();
 +
 +        return peek(true, false, topVer, plc);
 +    }
 +
      /**
-      * TODO: GG-4009: do we need to generate event and invalidate value?
+      * TODO: IGNITE-3500: do we need to generate event and invalidate value?
       *
       * @return {@code true} if expired.
       * @throws IgniteCheckedException In case of failure.

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 46a20ab,86dd4ea..f339f46
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -57,8 -54,8 +56,9 @@@ import org.apache.ignite.internal.Ignit
  import org.apache.ignite.internal.IgniteInterruptedCheckedException;
  import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
  import org.apache.ignite.internal.events.DiscoveryCustomEvent;
- import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+ import org.apache.ignite.internal.managers.discovery.DiscoCache;
+ import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 +import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
@@@ -222,16 -210,19 +220,18 @@@ public class GridCachePartitionExchange
                          }
                      }
  
-                     assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() :
 -                    assert
 -                        evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
++                    assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
                          "Node joined with smaller-than-local " +
--                            "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
++                        "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
  
-                     exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+                     exchId = exchangeId(n.id(),
+                         affinityTopologyVersion(evt),
+                         evt.type());
  
-                     exchFut = exchangeFuture(exchId, e, null, null);
+                     exchFut = exchangeFuture(exchId, evt, cache,null, null);
                  }
                  else {
-                     DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
+                     DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
  
                      if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
                          DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
@@@ -260,11 -251,10 +260,10 @@@
                              }
                          }
  
-                         //todo think about refactoring
 -                        if (!F.isEmpty(valid)) {
 +                        if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) {
-                             exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+                             exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
  
-                             exchFut = exchangeFuture(exchId, e, valid, null);
+                             exchFut = exchangeFuture(exchId, evt, cache, valid, null);
                          }
                      }
                      else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
@@@ -278,14 -268,8 +277,14 @@@
                              }
                          }
                          else
-                             exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                             exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
                      }
 +                    else if (customEvt.customMessage() instanceof StartFullSnapshotAckDiscoveryMessage
 +                        && !((StartFullSnapshotAckDiscoveryMessage)customEvt.customMessage()).hasError()) {
-                         exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
++                        exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 +
-                         exchFut = exchangeFuture(exchId, e, null, null);
++                        exchFut = exchangeFuture(exchId, evt, null, null, null);
 +                    }
                  }
  
                  if (exchId != null) {
@@@ -1303,8 -1291,11 +1307,12 @@@
              }
              else {
                  if (msg.client()) {
 -                    final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
 +                    final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(
-                         msg.exchangeId(), null, null, null);
++                        msg.exchangeId(),
+                         null,
+                         null,
+                         null,
+                         null);
  
                      exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                          @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 987d696,b016883..c18dbcf
mode 100644,100755..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -880,48 -857,33 +891,59 @@@ public class GridCacheProcessor extend
  
          assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started";
          assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
+ 
+         if (!ctx.clientNode() && !ctx.isDaemon())
+             addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
+ 
+     }
+ 
+     /**
+      * @param timeout Cleanup timeout.
+      */
+     private void addRemovedItemsCleanupTask(long timeout) {
+         ctx.timeout().addTimeoutObject(new RemovedItemsCleanupTask(timeout));
      }
  
 -    /** {@inheritDoc} */
 -    @SuppressWarnings("unchecked")
 -    @Override public void stop(boolean cancel) throws IgniteCheckedException {
 -        for (String cacheName : stopSeq) {
 -            GridCacheAdapter<?, ?> cache = stoppedCaches.remove(maskNull(cacheName));
 +    /**
 +     *
 +     */
 +    private void checkConsistency() throws IgniteCheckedException {
 +        if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
 +            for (ClusterNode n : ctx.discovery().remoteNodes()) {
 +                if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
 +                    continue;
  
 -            if (cache != null)
 -                stopCache(cache, cancel);
 -        }
 +                checkTransactionConfiguration(n);
  
 -        for (GridCacheAdapter<?, ?> cache : stoppedCaches.values()) {
 -            if (cache == stoppedCaches.remove(maskNull(cache.name())))
 -                stopCache(cache, cancel);
 +                DeploymentMode locDepMode = ctx.config().getDeploymentMode();
 +                DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 +
 +                CU.checkAttributeMismatch(
 +                    log, null, n.id(), "deploymentMode", "Deployment mode",
 +                    locDepMode, rmtDepMode, true);
 +
 +                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
 +                    CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
 +
 +                    if (rmtCfg != null) {
 +                        CacheConfiguration locCfg = desc.cacheConfiguration();
 +
 +                        checkCache(locCfg, rmtCfg, n);
 +
 +                        // Check plugin cache configurations.
 +                        CachePluginManager pluginMgr = desc.pluginManager();
 +
 +                        pluginMgr.validateRemotes(rmtCfg, n);
 +                    }
 +                }
 +            }
          }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public void stop(boolean cancel) throws IgniteCheckedException {
 +        stopCaches(cancel);
  
          List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index ca71f51,9c4e4ef..eb23c43
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@@ -30,8 -30,8 +30,9 @@@ import java.util.concurrent.locks.Reent
  import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.IgniteLogger;
  import org.apache.ignite.cluster.ClusterNode;
 +import org.apache.ignite.events.DiscoveryEvent;
  import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+ import org.apache.ignite.internal.managers.discovery.DiscoCache;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@@ -41,10 -41,7 +42,8 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.util.GridAtomicLong;
  import org.apache.ignite.internal.util.tostring.GridToStringExclude;
  import org.apache.ignite.internal.util.typedef.F;
 +import org.apache.ignite.internal.util.typedef.T2;
  import org.apache.ignite.internal.util.typedef.X;
- import org.apache.ignite.internal.util.typedef.internal.CU;
  import org.apache.ignite.internal.util.typedef.internal.U;
  import org.jetbrains.annotations.Nullable;
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index fba1877,58dbbcf..ac6eee3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@@ -650,10 -657,10 +654,11 @@@ public abstract class GridDhtCacheAdapt
          String taskName,
          @Nullable IgniteCacheExpiryPolicy expiry,
          boolean skipVals,
 -        boolean canRemap
 +        boolean canRemap,
 +        boolean recovery
      ) {
          return getAllAsync0(keys,
+             readerArgs,
              readThrough,
              /*don't check local tx. */false,
              subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 2c3435d,d0d801a..5fe0ef4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@@ -392,32 -390,22 +395,23 @@@ public final class GridDhtGetFuture<K, 
                  txFut.markInitialized();
          }
  
-         IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
+         IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut;
  
          if (txFut == null || txFut.isDone()) {
-             if (tx == null) {
-                 fut = cache().getDhtAllAsync(
-                     keys.keySet(),
-                     readThrough,
-                     subjId,
-                     taskName,
-                     expiryPlc,
-                     skipVals,
-                     /*can remap*/true,
-                     recovery);
-             }
-             else {
-                 fut = tx.getAllAsync(cctx,
-                     null,
-                     keys.keySet(),
-                     /*deserialize binary*/false,
-                     skipVals,
-                     /*keep cache objects*/true,
-                     /*skip store*/!readThrough,
-                     false);
-             }
+             fut = cache().getDhtAllAsync(
+                 keys.keySet(),
+                 readerArgs,
+                 readThrough,
+                 subjId,
+                 taskName,
+                 expiryPlc,
+                 skipVals,
 -                /*can remap*/true);
++                /*can remap*/true,
++                recovery);
          }
          else {
+             final ReaderArguments args = readerArgs;
+ 
              // If we are here, then there were active transactions for some entries
              // when we were adding the reader. In that case we must wait for those
              // transactions to complete.
@@@ -428,27 -416,15 +422,16 @@@
                          if (e != null)
                              throw new GridClosureException(e);
  
-                         if (tx == null) {
-                             return cache().getDhtAllAsync(
-                                 keys.keySet(),
-                                 readThrough,
-                                 subjId,
-                                 taskName,
-                                 expiryPlc,
-                                 skipVals,
-                                 /*can remap*/true,
-                                 recovery);
-                         }
-                         else {
-                             return tx.getAllAsync(cctx,
-                                 null,
-                                 keys.keySet(),
-                                 /*deserialize binary*/false,
-                                 skipVals,
-                                 /*keep cache objects*/true,
-                                 /*skip store*/!readThrough,
-                                 false);
-                         }
+                         return cache().getDhtAllAsync(
+                             keys.keySet(),
+                             args,
+                             readThrough,
+                             subjId,
+                             taskName,
+                             expiryPlc,
+                             skipVals,
 -                            /*can remap*/true);
++                            /*can remap*/true,
++                            recovery);
                      }
                  }
              );

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 81d2570,33f4661..9cc69b5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@@ -350,32 -348,22 +353,23 @@@ public final class GridDhtGetSingleFutu
              }
          }
  
-         IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
+         IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut;
  
          if (rdrFut == null || rdrFut.isDone()) {
-             if (tx == null) {
-                 fut = cache().getDhtAllAsync(
-                     Collections.singleton(key),
-                     readThrough,
-                     subjId,
-                     taskName,
-                     expiryPlc,
-                     skipVals,
-                     /*can remap*/true,
-                     recovery);
-             }
-             else {
-                 fut = tx.getAllAsync(cctx,
-                     null,
-                     Collections.singleton(key),
-                     /*deserialize binary*/false,
-                     skipVals,
-                     /*keep cache objects*/true,
-                     /*skip store*/!readThrough,
-                     false);
-             }
+             fut = cache().getDhtAllAsync(
+                 Collections.singleton(key),
+                 readerArgs,
+                 readThrough,
+                 subjId,
+                 taskName,
+                 expiryPlc,
+                 skipVals,
 -                /*can remap*/true);
++                /*can remap*/true,
++                recovery);
          }
          else {
+             final ReaderArguments args = readerArgs;
+ 
              rdrFut.listen(
                  new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                      @Override public void apply(IgniteInternalFuture<Boolean> fut) {
@@@ -397,20 -384,7 +390,8 @@@
                                  taskName,
                                  expiryPlc,
                                  skipVals,
 -                                /*can remap*/true);
 +                                /*can remap*/true,
 +                                recovery);
-                         }
-                         else {
-                             fut0 = tx.getAllAsync(cctx,
-                                 null,
-                                 Collections.singleton(key),
-                                 /*deserialize binary*/false,
-                                 skipVals,
-                                 /*keep cache objects*/true,
-                                 /*skip store*/!readThrough,
-                                 false
-                             );
-                         }
  
                          fut0.listen(createGetFutureListener());
                      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 9b30593,9f8498a..b1fe6ec
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@@ -49,24 -44,23 +49,28 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
  import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
  import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 +import org.apache.ignite.internal.util.GridCircularBuffer;
+ import org.apache.ignite.internal.processors.query.GridQueryProcessor;
  import org.apache.ignite.internal.util.future.GridFutureAdapter;
 -import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 +import org.apache.ignite.internal.util.lang.GridIterator;
  import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 +import org.apache.ignite.internal.util.typedef.CI1;
 +import org.apache.ignite.internal.util.typedef.T2;
+ import org.apache.ignite.internal.util.typedef.F;
  import org.apache.ignite.internal.util.typedef.internal.CU;
  import org.apache.ignite.internal.util.typedef.internal.S;
  import org.apache.ignite.internal.util.typedef.internal.U;
  import org.apache.ignite.lang.IgniteUuid;
  import org.jetbrains.annotations.NotNull;
  import org.jetbrains.annotations.Nullable;
+ import org.jsr166.ConcurrentLinkedDeque8;
  
  import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
+ import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
  import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
 +import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
 +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
@@@ -160,22 -149,9 +169,24 @@@ public class GridDhtLocalPartition impl
          int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
              Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
  
-         rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
+         rmvQueueMaxSize = U.ceilPow2(delQueueSize);
+ 
+         rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
 +
 +        try {
 +            store = cctx.offheap().createCacheDataStore(id);
 +        }
 +        catch (IgniteCheckedException e) {
 +            // TODO ignite-db
 +            throw new IgniteException(e);
 +        }
 +    }
 +
 +    /**
 +     * @return Data store.
 +     */
 +    public CacheDataStore dataStore() {
 +        return store;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index ac3e2c8,bdd84b0..4f8de4e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@@ -17,10 -17,8 +17,9 @@@
  
  package org.apache.ignite.internal.processors.cache.distributed.dht;
  
- import java.util.Collection;
  import java.util.List;
  import java.util.Map;
 +import java.util.Set;
  import java.util.UUID;
  import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.cluster.ClusterNode;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 322bbe3,84ff96b..04188fd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -30,18 -30,15 +30,19 @@@ import java.util.Set
  import java.util.UUID;
  import java.util.concurrent.atomic.AtomicReferenceArray;
  import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.IgniteException;
  import org.apache.ignite.IgniteLogger;
  import org.apache.ignite.IgniteSystemProperties;
 +import org.apache.ignite.cache.PartitionLossPolicy;
  import org.apache.ignite.cluster.ClusterNode;
  import org.apache.ignite.events.DiscoveryEvent;
 +import org.apache.ignite.events.EventType;
  import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
  import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+ import org.apache.ignite.internal.managers.discovery.DiscoCache;
  import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 +import org.apache.ignite.internal.processors.cache.ClusterState;
  import org.apache.ignite.internal.processors.cache.GridCacheContext;
  import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@@ -503,86 -468,70 +504,86 @@@ class GridDhtPartitionTopologyImpl impl
  
          ClusterNode loc = cctx.localNode();
  
 -        U.writeLock(lock);
 +        cctx.shared().database().checkpointReadLock();
  
 -        try {
 -            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 +        synchronized (cctx.shared().exchange().interruptLock()) {
 +            if (Thread.currentThread().isInterrupted())
 +                throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
  
 -            if (stopping)
 -                return;
 +            try {
 +                U.writeLock(lock);
 +            }
 +            catch (IgniteInterruptedCheckedException e) {
 +                cctx.shared().database().checkpointReadUnlock();
  
 -            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
 -                topVer + ", exchId=" + exchId + ']';
 +                throw e;
 +            }
  
 -            if (exchId.isLeft())
 -                removeNode(exchId.nodeId());
 +            try {
 +                GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
  
 -            ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 +                if (stopping)
 +                    return;
  
 -            if (log.isDebugEnabled())
 -                log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
 +                assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
 +                    topVer + ", exchId=" + exchId + ']';
  
 -            long updateSeq = this.updateSeq.incrementAndGet();
 +                if (exchId.isLeft())
 +                    removeNode(exchId.nodeId());
  
-                 ClusterNode oldest = currentCoordinator();
 -            cntrMap.clear();
++                ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
  
 -            // If this is the oldest node.
 -            if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
 -                if (node2part == null) {
 -                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 +                if (log.isDebugEnabled())
 +                    log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
  
 -                    if (log.isDebugEnabled())
 -                        log.debug("Created brand new full topology map on oldest node [exchId=" +
 -                            exchId + ", fullMap=" + fullMapString() + ']');
 -                }
 -                else if (!node2part.valid()) {
 -                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
 +                long updateSeq = this.updateSeq.incrementAndGet();
  
 -                    if (log.isDebugEnabled())
 -                        log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
 -                            node2part + ']');
 +                cntrMap.clear();
 +
 +                // If this is the oldest node.
 +                if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
 +                    if (node2part == null) {
 +                        node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 +
 +                        if (log.isDebugEnabled())
 +                            log.debug("Created brand new full topology map on oldest node [exchId=" +
 +                                exchId + ", fullMap=" + fullMapString() + ']');
 +                    }
 +                    else if (!node2part.valid()) {
 +                        node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
 +
 +                        if (log.isDebugEnabled())
 +                            log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
 +                                node2part + ']');
 +                    }
 +                    else if (!node2part.nodeId().equals(loc.id())) {
 +                        node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
 +
 +                        if (log.isDebugEnabled())
 +                            log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
 +                                exchId + ", fullMap=" + fullMapString() + ']');
 +                    }
                  }
 -                else if (!node2part.nodeId().equals(loc.id())) {
 -                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
  
 -                    if (log.isDebugEnabled())
 -                        log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
 -                            exchId + ", fullMap=" + fullMapString() + ']');
 +                if (affReady)
 +                    initPartitions0(exchFut, updateSeq);
 +                else {
 +                    List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
 +
 +                    createPartitions(aff, updateSeq);
                  }
 -            }
  
 -            if (affReady)
 -                initPartitions0(exchFut, updateSeq);
 -            else {
 -                List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
 +                consistencyCheck();
  
 -                createPartitions(aff, updateSeq);
 +                if (log.isDebugEnabled())
 +                    log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
 +                        fullMapString() + ']');
              }
 +            finally {
 +                lock.writeLock().unlock();
  
 -            consistencyCheck();
 -
 -            if (log.isDebugEnabled())
 -                log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
 -                    fullMapString() + ']');
 -        }
 -        finally {
 -            lock.writeLock().unlock();
 +                cctx.shared().database().checkpointReadUnlock();
 +            }
          }
  
          // Wait for evictions.
@@@ -761,14 -692,12 +762,14 @@@
          try {
              loc = locParts.get(p);
  
 +            state = loc != null ? loc.state() : null;
 +
-             boolean belongs = cctx.affinity().localNode(p, topVer);
+             boolean belongs = cctx.affinity().partitionLocalNode(p, topVer);
  
 -            if (loc != null && loc.state() == EVICTED) {
 +            if (loc != null && state == EVICTED) {
                  locParts.set(p, loc = null);
  
 -                if (!belongs)
 +                if (!treatAllPartAsLoc && !belongs)
                      throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
                          "(often may be caused by inconsistent 'key.hashCode()' implementation) " +
                          "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
@@@ -835,9 -750,10 +836,10 @@@
          for (int i = 0; i < locParts.length(); i++) {
              GridDhtLocalPartition part = locParts.get(i);
  
 -            if (part != null)
 +            if (part != null && part.state().active())
                  list.add(part);
          }
+ 
          return list;
      }
  
@@@ -1065,11 -981,9 +1067,11 @@@
  
      /** {@inheritDoc} */
      @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-     @Nullable @Override public GridDhtPartitionMap2 update(
 -    @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
++    @Override public GridDhtPartitionMap2 update(
 +        @Nullable GridDhtPartitionExchangeId exchId,
          GridDhtPartitionFullMap partMap,
 -        @Nullable Map<Integer, Long> cntrMap) {
 +        @Nullable Map<Integer, T2<Long, Long>> cntrMap
 +    ) {
          if (log.isDebugEnabled())
              log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6c4da68,519239a..0ef8bb8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@@ -353,10 -341,10 +354,11 @@@ public class GridPartitionedGetFuture<K
                      topVer,
                      subjId,
                      taskName == null ? 0 : taskName.hashCode(),
+                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                      expiryPlc != null ? expiryPlc.forAccess() : -1L,
                      skipVals,
 -                    cctx.deploymentEnabled());
 +                    cctx.deploymentEnabled(),
 +                    recovery);
  
                  add(fut); // Append new future.
  
@@@ -461,9 -452,11 +464,9 @@@
                      GridCacheVersion ver = null;
  
                      if (needVer) {
-                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                         getRes = entry.innerGetVersioned(
                              null,
                              null,
 -                            /*swap*/true,
 -                            /*unmarshal*/true,
                              /**update-metrics*/false,
                              /*event*/!skipVals,
                              subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index ea69743,a3f6b72..0da3a44
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@@ -316,10 -300,10 +317,11 @@@ public class GridPartitionedSingleGetFu
                      topVer,
                      subjId,
                      taskName == null ? 0 : taskName.hashCode(),
+                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                      expiryPlc != null ? expiryPlc.forAccess() : -1L,
                      skipVals,
 -                    cctx.deploymentEnabled());
 +                    cctx.deploymentEnabled(),
 +                    recovery);
              }
  
              try {
@@@ -388,9 -375,11 +390,9 @@@
                      GridCacheVersion ver = null;
  
                      if (needVer) {
-                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                         EntryGetResult res = entry.innerGetVersioned(
                              null,
                              null,
 -                            /*swap*/true,
 -                            /*unmarshal*/true,
                              /**update-metrics*/false,
                              /*event*/!skipVals,
                              subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 72489fd,cebf4ae..ef8150c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -555,11 -547,8 +555,10 @@@ public class GridDhtAtomicCache<K, V> e
      /** {@inheritDoc} */
      @Override protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer)
          throws IgniteCheckedException {
 +        CacheOperationContext opCtx = ctx.operationContextPerCall();
 +
          return getAllAsyncInternal(keys,
              !ctx.config().isReadFromBackup(),
-             true,
              null,
              ctx.kernalContext().job().currentTaskName(),
              deserializeBinary,
@@@ -1607,10 -1584,12 +1605,10 @@@
                              GridCacheVersion ver = null;
  
                              if (needVer) {
-                                 T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                 getRes = entry.innerGetVersioned(
                                      null,
                                      null,
 -                                    /*swap*/true,
 -                                    /*unmarshal*/true,
 -                                    /**update-metrics*/false,
 +                                    /*update-metrics*/false,
                                      /*event*/!skipVals,
                                      subjId,
                                      null,
@@@ -3209,10 -3186,11 +3212,11 @@@
       * @param nodeId Sender node ID.
       * @param res Near atomic update response.
       */
 -    @SuppressWarnings("unchecked")
      private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
          if (msgLog.isDebugEnabled())
-             msgLog.debug("Received near atomic update response [futId" + res.futureVersion() +
+             msgLog.debug("Received near atomic update response " +
 -                "[futId=" + res.futureVersion() + ", node=" + nodeId + ']');
++                "[futId=" + res.futureVersion() +
 +                ", node=" + nodeId + ']');
  
          res.nodeId(ctx.localNodeId());
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index d5e8389,e1e0ec2..d86dc91
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -476,9 -473,11 +478,9 @@@ public class GridDhtColocatedCache<K, V
                              GridCacheVersion ver = null;
  
                              if (needVer) {
-                                 T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                 getRes = entry.innerGetVersioned(
                                      null,
                                      null,
 -                                    /*swap*/true,
 -                                    /*unmarshal*/true,
                                      /**update-metrics*/false,
                                      /*event*/!skipVals,
                                      subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 358ec8f,79ca108..31cff03
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -911,37 -911,38 +917,38 @@@ public final class GridDhtColocatedLock
                                          !topLocked &&
                                          (tx == null || !tx.hasRemoteLocks());
  
 -                                        first = false;
 -                                    }
 -
 -                                    req = new GridNearLockRequest(
 -                                        cctx.cacheId(),
 -                                        topVer,
 -                                        cctx.nodeId(),
 -                                        threadId,
 -                                        futId,
 -                                        lockVer,
 -                                        inTx(),
 -                                        implicitTx(),
 -                                        implicitSingleTx(),
 -                                        read,
 -                                        retval,
 -                                        isolation(),
 -                                        isInvalidate(),
 -                                        timeout,
 -                                        mappedKeys.size(),
 -                                        inTx() ? tx.size() : mappedKeys.size(),
 -                                        inTx() && tx.syncMode() == FULL_SYNC,
 -                                        inTx() ? tx.subjectId() : null,
 -                                        inTx() ? tx.taskNameHash() : 0,
 -                                        read ? createTtl : -1L,
 +                                    first = false;
 +                                }
 +
 +                                req = new GridNearLockRequest(
 +                                    cctx.cacheId(),
 +                                    topVer,
 +                                    cctx.nodeId(),
 +                                    threadId,
 +                                    futId,
 +                                    lockVer,
 +                                    inTx(),
 +                                    implicitTx(),
 +                                    implicitSingleTx(),
 +                                    read,
 +                                    retval,
 +                                    isolation(),
 +                                    isInvalidate(),
 +                                    timeout,
 +                                    mappedKeys.size(),
 +                                    inTx() ? tx.size() : mappedKeys.size(),
 +                                    inTx() && tx.syncMode() == FULL_SYNC,
 +                                    inTx() ? tx.subjectId() : null,
 +                                    inTx() ? tx.taskNameHash() : 0,
-                                     read ? accessTtl : -1L,
++                                    read ? createTtl : -1L,
+                                         read ? accessTtl : -1L,
 -                                        skipStore,
 -                                        keepBinary,
 -                                        clientFirst,
 -                                        cctx.deploymentEnabled());
 +                                    skipStore,
 +                                    keepBinary,
 +                                    clientFirst,
 +                                    cctx.deploymentEnabled());
  
 -                                    mapping.request(req);
 -                                }
 +                                mapping.request(req);
 +                            }
  
                              distributedKeys.add(key);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 97d768a,9942423..b80ad04
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@@ -272,24 -288,22 +272,24 @@@ class GridDhtPartitionSupplier 
                      boolean partMissing = false;
  
                      if (phase == SupplyContextPhase.NEW)
 -                        phase = SupplyContextPhase.ONHEAP;
 +                        phase = SupplyContextPhase.OFFHEAP;
 +
 +                    if (phase == SupplyContextPhase.OFFHEAP) {
 +                        IgniteRebalanceIterator iter;
 +
 +                        if (sctx == null || sctx.entryIt == null) {
 +                            iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
  
 -                    if (phase == SupplyContextPhase.ONHEAP) {
 -                        Iterator<GridCacheMapEntry> entIt = sctx != null ?
 -                            (Iterator<GridCacheMapEntry>)sctx.entryIt : loc.allEntries().iterator();
 +                            if (!iter.historical())
 +                                s.clean(part);
 +                        }
 +                        else
 +                            iter = (IgniteRebalanceIterator)sctx.entryIt;
  
 -                        while (entIt.hasNext()) {
 +                        while (iter.hasNext()) {
-                             if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                             if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
 -                                // Demander no longer needs this partition, so we send '-1' partition and move on.
 +                                // Demander no longer needs this partition,
 +                                // so we send '-1' partition and move on.
                                  s.missed(part);
  
                                  if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index af0085d,46fb144..c33dc7b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -46,9 -44,7 +46,9 @@@ import org.apache.ignite.internal.Ignit
  import org.apache.ignite.internal.IgniteInterruptedCheckedException;
  import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
  import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
- import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 +import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
  import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@@ -488,9 -455,9 +502,11 @@@ public class GridDhtPartitionsExchangeF
          assert !dummy && !forcePreload : this;
  
          try {
+             discoCache.updateAlives(cctx.discovery());
+ 
 +            AffinityTopologyVersion topVer = topologyVersion();
 +
-             srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVer));
+             srvNodes = new ArrayList<>(discoCache.serverNodes());
  
              remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
  
@@@ -1324,13 -1118,10 +1340,12 @@@
       * Cleans up resources to avoid excessive memory usage.
       */
      public void cleanUp() {
-         topSnapshot.set(null);
          singleMsgs.clear();
          fullMsgs.clear();
 +        changeGlobalStateExceptions.clear();
          crd = null;
          partReleaseFut = null;
 +        changeGlobalStateE = null;
      }
  
      /**
@@@ -1402,11 -1194,10 +1418,13 @@@
  
              if (crd.isLocal()) {
                  if (remaining.remove(node.id())) {
-                     updatePartitionSingleMap(node, msg);
+                     updateSingleMap = true;
+ 
+                     pendingSingleUpdates++;
  
 +                    if (exchangeOnChangeGlobalState && msg.getException() != null)
 +                        changeGlobalStateExceptions.put(node.id(), msg.getException());
 +
                      allReceived = remaining.isEmpty();
                  }
              }
@@@ -1414,8 -1205,42 +1432,42 @@@
                  singleMsgs.put(node, msg);
          }
  
-         if (allReceived)
+         if (updateSingleMap) {
+             try {
 -                updatePartitionSingleMap(msg);
++                updatePartitionSingleMap(node, msg);
+             }
+             finally {
+                 synchronized (mux) {
+                     assert pendingSingleUpdates > 0;
+ 
+                     pendingSingleUpdates--;
+ 
+                     if (pendingSingleUpdates == 0)
+                         mux.notifyAll();
+                 }
+             }
+         }
+ 
+         if (allReceived) {
+             awaitSingleMapUpdates();
+ 
              onAllReceived();
+         }
+     }
+ 
+     /**
+      *
+      */
+     private void awaitSingleMapUpdates() {
+         synchronized (mux) {
+             try {
+                 while (pendingSingleUpdates > 0)
+                     U.wait(mux);
+             }
+             catch (IgniteInterruptedCheckedException e) {
+                 U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+             }
+         }
      }
  
      /**
@@@ -1541,10 -1272,10 +1593,10 @@@
          try {
              assert crd.isLocal();
  
-             if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
+             if (!crd.equals(discoCache.serverNodes().get(0))) {
                  for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                      if (!cacheCtx.isLocal())
 -                        cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff);
 +                        cacheCtx.topology().beforeExchange(this, !centralizedAff);
                  }
              }
  
@@@ -1937,10 -1626,9 +1991,12 @@@
                          }
  
                          if (crd0.isLocal()) {
 +                            if (exchangeOnChangeGlobalState && changeGlobalStateE !=null)
 +                                changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE);
 +
                              if (allReceived) {
+                                 awaitSingleMapUpdates();
+ 
                                  onAllReceived();
  
                                  return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ff4e838,8b74ae6..7cabbd4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@@ -377,10 -374,10 +377,11 @@@ public final class GridNearGetFuture<K
                      topVer,
                      subjId,
                      taskName == null ? 0 : taskName.hashCode(),
+                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                      expiryPlc != null ? expiryPlc.forAccess() : -1L,
                      skipVals,
 -                    cctx.deploymentEnabled());
 +                    cctx.deploymentEnabled(),
 +                    recovery);
  
                  add(fut); // Append new future.
  
@@@ -441,9 -438,11 +442,9 @@@
                  // First we peek into near cache.
                  if (isNear) {
                      if (needVer) {
-                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                         EntryGetResult res = entry.innerGetVersioned(
                              null,
                              null,
 -                            /*swap*/true,
 -                            /*unmarshal*/true,
                              /**update-metrics*/true,
                              /*event*/!skipVals,
                              subjId,
@@@ -577,9 -579,11 +579,9 @@@
                      boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
  
                      if (needVer) {
-                         T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+                         EntryGetResult res = dhtEntry.innerGetVersioned(
                              null,
                              null,
 -                            /*swap*/true,
 -                            /*unmarshal*/true,
                              /**update-metrics*/false,
                              /*event*/!nearRead && !skipVals,
                              subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index b096d5d,7ca2635..dbf8391
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@@ -137,10 -138,10 +141,11 @@@ public class GridNearGetRequest extend
          @NotNull AffinityTopologyVersion topVer,
          UUID subjId,
          int taskNameHash,
+         long createTtl,
          long accessTtl,
          boolean skipVals,
 -        boolean addDepInfo
 +        boolean addDepInfo,
 +        boolean recovery
      ) {
          assert futId != null;
          assert miniId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------