You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/24 14:12:34 UTC

[44/50] [abbrv] ignite git commit: Merge branch master ignite-2.0 to ignite-3477

http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 6bf96b0,81606d4..fe8d654
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -284,10 -326,2090 +326,2091 @@@ public class GridNearTxLocal extends Gr
      }
  
      /**
-      * @return {@code True} if transaction contains colocated key mapped to the local node.
+      * @return {@code True} if transaction contains colocated key mapped to the local node.
+      */
+     public boolean colocatedLocallyMapped() {
+         return colocatedLocallyMapped;
+     }
+ 
+     /**
+      * @param colocatedLocallyMapped {@code True} if transaction contains colocated key mapped to the local node.
+      */
+     public void colocatedLocallyMapped(boolean colocatedLocallyMapped) {
+         this.colocatedLocallyMapped = colocatedLocallyMapped;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) {
+         return entry.detached() || super.ownsLockUnsafe(entry);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException {
+         return entry.detached() || super.ownsLock(entry);
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param map Map to put.
+      * @param retval Flag indicating whether a value should be returned.
+      * @return Future for put operation.
+      */
+     @SuppressWarnings("unchecked")
+     public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
+         GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         Map<? extends K, ? extends V> map,
+         boolean retval
+     ) {
+         return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+             entryTopVer,
+             map,
+             null,
+             null,
+             null,
+             retval);
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param key Key.
+      * @param val Value.
+      * @param retval Return value flag.
+      * @param filter Filter.
+      * @return Future for put operation.
+      */
+     public final <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+         GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         K key,
+         V val,
+         boolean retval,
+         CacheEntryPredicate filter) {
+         return putAsync0(cacheCtx,
+             entryTopVer,
+             key,
+             val,
+             null,
+             null,
+             retval,
+             filter);
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param key Key.
+      * @param entryProcessor Entry processor.
+      * @param invokeArgs Optional arguments for entry processor.
+      * @return Operation future.
+      */
+     public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         K key,
+         EntryProcessor<K, V, Object> entryProcessor,
+         Object... invokeArgs) {
+         return (IgniteInternalFuture)putAsync0(cacheCtx,
+             entryTopVer,
+             key,
+             null,
+             entryProcessor,
+             invokeArgs,
+             true,
+             null);
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param map Entry processors map.
+      * @param invokeArgs Optional arguments for entry processor.
+      * @return Operation future.
+      */
+     @SuppressWarnings("unchecked")
+     public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
+         GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
+         Object... invokeArgs
+     ) {
+         return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+             entryTopVer,
+             null,
+             map,
+             invokeArgs,
+             null,
+             true);
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param drMap DR map to put.
+      * @return Future for DR put operation.
+      */
+     public IgniteInternalFuture<?> putAllDrAsync(
+         GridCacheContext cacheCtx,
+         Map<KeyCacheObject, GridCacheDrInfo> drMap
+     ) {
+         Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+             @Override public Object apply(GridCacheDrInfo val) {
+                 return val.value();
+             }
+         });
+ 
+         return this.<Object, Object>putAllAsync0(cacheCtx,
+             null,
+             map,
+             null,
+             null,
+             drMap,
+             false);
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param drMap DR map.
+      * @return Future for asynchronous remove.
+      */
+     public IgniteInternalFuture<?> removeAllDrAsync(
+         GridCacheContext cacheCtx,
+         Map<KeyCacheObject, GridCacheVersion> drMap
+     ) {
+         return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param keys Keys to remove.
+      * @param retval Flag indicating whether a value should be returned.
+      * @param filter Filter.
+      * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+      * @return Future for asynchronous remove.
+      */
+     public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
+         GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         Collection<? extends K> keys,
+         boolean retval,
+         CacheEntryPredicate filter,
+         boolean singleRmv
+     ) {
+         return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv);
+     }
+ 
+     /**
+      * Internal method for single update operation.
+      *
+      * @param cacheCtx Cache context.
+      * @param key Key.
+      * @param val Value.
+      * @param entryProcessor Entry processor.
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param retval Return value flag.
+      * @param filter Filter.
+      * @return Operation future.
+      */
+     private <K, V> IgniteInternalFuture putAsync0(
+         final GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         K key,
+         @Nullable V val,
+         @Nullable EntryProcessor<K, V, Object> entryProcessor,
+         @Nullable final Object[] invokeArgs,
+         final boolean retval,
+         @Nullable final CacheEntryPredicate filter
+     ) {
+         assert key != null;
+ 
+         try {
+             beforePut(cacheCtx, retval);
+ 
+             final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+ 
+             CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+ 
+             final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+ 
+             KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+ 
+             boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+ 
+             final CacheEntryPredicate[] filters = CU.filterArray(filter);
+ 
+             final IgniteInternalFuture<Void> loadFut = enlistWrite(
+                 cacheCtx,
+                 entryTopVer,
+                 cacheKey,
+                 val,
+                 opCtx != null ? opCtx.expiry() : null,
+                 entryProcessor,
+                 invokeArgs,
+                 retval,
+                 /*lockOnly*/false,
+                 filters,
+                 ret,
+                 opCtx != null && opCtx.skipStore(),
+                 /*singleRmv*/false,
+                 keepBinary,
++                opCtx != null && opCtx.recovery(),
+                 dataCenterId);
+ 
+             if (pessimistic()) {
+                 assert loadFut == null || loadFut.isDone() : loadFut;
+ 
+                 if (loadFut != null)
+                     loadFut.get();
+ 
+                 final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey);
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Before acquiring transaction lock for put on key: " + enlisted);
+ 
+                 long timeout = remainingTime();
+ 
+                 if (timeout == -1)
+                     return new GridFinishedFuture<>(timeoutException());
+ 
+                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+                     timeout,
+                     this,
+                     /*read*/entryProcessor != null, // Needed to force load from store.
+                     retval,
+                     isolation,
+                     isInvalidate(),
+                     -1L,
+                     -1L);
+ 
+                 PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                     @Override public GridCacheReturn postLock(GridCacheReturn ret)
+                         throws IgniteCheckedException
+                     {
+                         if (log.isDebugEnabled())
+                             log.debug("Acquired transaction lock for put on keys: " + enlisted);
+ 
+                         postLockWrite(cacheCtx,
+                             enlisted,
+                             ret,
+                             /*remove*/false,
+                             retval,
+                             /*read*/false,
+                             -1L,
+                             filters,
+                             /*computeInvoke*/true);
+ 
+                         return ret;
+                     }
+                 };
+ 
+                 if (fut.isDone()) {
+                     try {
+                         return nonInterruptable(plc1.apply(fut.get(), null));
+                     }
+                     catch (GridClosureException e) {
+                         return new GridFinishedFuture<>(e.unwrap());
+                     }
+                     catch (IgniteCheckedException e) {
+                         try {
+                             return nonInterruptable(plc1.apply(false, e));
+                         }
+                         catch (Exception e1) {
+                             return new GridFinishedFuture<>(e1);
+                         }
+                     }
+                 }
+                 else {
+                     return nonInterruptable(new GridEmbeddedFuture<>(
+                         fut,
+                         plc1
+                     ));
+                 }
+             }
+             else
+                 return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture(e);
+         }
+         catch (RuntimeException e) {
+             onException();
+ 
+             throw e;
+         }
+     }
+ 
+     /**
+      * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
+      * maps must be non-null.
+      *
+      * @param cacheCtx Context.
+      * @param map Key-value map to store.
+      * @param invokeMap Invoke map.
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param drMap DR map.
+      * @param retval Key-transform value map to store.
+      * @return Operation future.
+      */
+     @SuppressWarnings("unchecked")
+     private <K, V> IgniteInternalFuture putAllAsync0(
+         final GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         @Nullable Map<? extends K, ? extends V> map,
+         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
+         @Nullable final Object[] invokeArgs,
+         @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
+         final boolean retval
+     ) {
+         try {
+             beforePut(cacheCtx, retval);
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture(e);
+         }
+ 
+         final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+ 
+         final Byte dataCenterId;
+ 
+         if (opCtx != null && opCtx.hasDataCenterId()) {
+             assert drMap == null : drMap;
+             assert map != null || invokeMap != null;
+ 
+             dataCenterId = opCtx.dataCenterId();
+         }
+         else
+             dataCenterId = null;
+ 
+         // Cached entry may be passed only from entry wrapper.
+         final Map<?, ?> map0 = map;
+         final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+ 
+         if (log.isDebugEnabled())
+             log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
+ 
+         assert map0 != null || invokeMap0 != null;
+ 
+         final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+ 
+         if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
+             if (implicit())
+                 try {
+                     commit();
+                 }
+                 catch (IgniteCheckedException e) {
+                     return new GridFinishedFuture<>(e);
+                 }
+ 
+             return new GridFinishedFuture<>(ret.success(true));
+         }
+ 
+         try {
+             Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
+ 
+             final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
+ 
+             final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+ 
+             final IgniteInternalFuture<Void> loadFut = enlistWrite(
+                 cacheCtx,
+                 entryTopVer,
+                 keySet,
+                 opCtx != null ? opCtx.expiry() : null,
+                 map0,
+                 invokeMap0,
+                 invokeArgs,
+                 retval,
+                 false,
+                 CU.filterArray(null),
+                 ret,
+                 enlisted,
+                 drMap,
+                 null,
+                 opCtx != null && opCtx.skipStore(),
+                 false,
+                 keepBinary,
++                opCtx != null && opCtx.recovery(),
+                 dataCenterId);
+ 
+             if (pessimistic()) {
+                 assert loadFut == null || loadFut.isDone() : loadFut;
+ 
+                 if (loadFut != null) {
+                     try {
+                         loadFut.get();
+                     }
+                     catch (IgniteCheckedException e) {
+                         return new GridFinishedFuture(e);
+                     }
+                 }
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
+ 
+                 long timeout = remainingTime();
+ 
+                 if (timeout == -1)
+                     return new GridFinishedFuture<>(timeoutException());
+ 
+                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+                     timeout,
+                     this,
+                     /*read*/invokeMap != null, // Needed to force load from store.
+                     retval,
+                     isolation,
+                     isInvalidate(),
+                     -1L,
+                     -1L);
+ 
+                 PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                     @Override public GridCacheReturn postLock(GridCacheReturn ret)
+                         throws IgniteCheckedException
+                     {
+                         if (log.isDebugEnabled())
+                             log.debug("Acquired transaction lock for put on keys: " + enlisted);
+ 
+                         postLockWrite(cacheCtx,
+                             enlisted,
+                             ret,
+                             /*remove*/false,
+                             retval,
+                             /*read*/false,
+                             -1L,
+                             CU.filterArray(null),
+                             /*computeInvoke*/true);
+ 
+                         return ret;
+                     }
+                 };
+ 
+                 if (fut.isDone()) {
+                     try {
+                         return nonInterruptable(plc1.apply(fut.get(), null));
+                     }
+                     catch (GridClosureException e) {
+                         return new GridFinishedFuture<>(e.unwrap());
+                     }
+                     catch (IgniteCheckedException e) {
+                         try {
+                             return nonInterruptable(plc1.apply(false, e));
+                         }
+                         catch (Exception e1) {
+                             return new GridFinishedFuture<>(e1);
+                         }
+                     }
+                 }
+                 else {
+                     return nonInterruptable(new GridEmbeddedFuture<>(
+                         fut,
+                         plc1
+                     ));
+                 }
+             }
+             else
+                 return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+         }
+         catch (RuntimeException e) {
+             onException();
+ 
+             throw e;
+         }
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param cacheKey Key to enlist.
+      * @param val Value.
+      * @param expiryPlc Explicitly specified expiry policy for entry.
+      * @param entryProcessor Entry processor (for invoke operation).
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param retval Flag indicating whether a value should be returned.
+      * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+      * @param filter User filters.
+      * @param ret Return value.
+      * @param skipStore Skip store flag.
+      * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+      * @return Future for entry values loading.
+      */
+     private <K, V> IgniteInternalFuture<Void> enlistWrite(
+         final GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         KeyCacheObject cacheKey,
+         Object val,
+         @Nullable ExpiryPolicy expiryPlc,
+         @Nullable EntryProcessor<K, V, Object> entryProcessor,
+         @Nullable Object[] invokeArgs,
+         final boolean retval,
+         boolean lockOnly,
+         final CacheEntryPredicate[] filter,
+         final GridCacheReturn ret,
+         boolean skipStore,
+         final boolean singleRmv,
+         boolean keepBinary,
++        boolean recovery,
+         Byte dataCenterId) {
+         try {
 -            addActiveCache(cacheCtx);
++            addActiveCache(cacheCtx, recovery);
+ 
+             final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+             final boolean needVal = singleRmv || retval || hasFilters;
+             final boolean needReadVer = needVal && (serializable() && optimistic());
+ 
+             if (entryProcessor != null)
+                 transform = true;
+ 
+             GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
+ 
+             boolean loadMissed = enlistWriteEntry(cacheCtx,
+                 entryTopVer,
+                 cacheKey,
+                 val,
+                 entryProcessor,
+                 invokeArgs,
+                 expiryPlc,
+                 retval,
+                 lockOnly,
+                 filter,
+                 /*drVer*/drVer,
+                 /*drTtl*/-1L,
+                 /*drExpireTime*/-1L,
+                 ret,
+                 /*enlisted*/null,
+                 skipStore,
+                 singleRmv,
+                 hasFilters,
+                 needVal,
+                 needReadVer,
 -                keepBinary);
++                keepBinary,
++                recovery);
+ 
+             if (loadMissed) {
+                 AffinityTopologyVersion topVer = topologyVersionSnapshot();
+ 
+                 if (topVer == null)
+                     topVer = entryTopVer;
+ 
+                 return loadMissing(cacheCtx,
+                     topVer != null ? topVer : topologyVersion(),
+                     Collections.singleton(cacheKey),
+                     filter,
+                     ret,
+                     needReadVer,
+                     singleRmv,
+                     hasFilters,
+                     /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+                     retval,
+                     keepBinary,
++                    recovery,
+                     expiryPlc);
+             }
+ 
+             return new GridFinishedFuture<>();
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture<>(e);
+         }
+     }
+ 
+     /**
+      * Internal routine for <tt>putAll(..)</tt>
+      *
+      * @param cacheCtx Cache context.
+      * @param keys Keys to enlist.
+      * @param expiryPlc Explicitly specified expiry policy for entry.
+      * @param lookup Value lookup map ({@code null} for remove).
+      * @param invokeMap Map with entry processors for invoke operation.
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param retval Flag indicating whether a value should be returned.
+      * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+      * @param filter User filters.
+      * @param ret Return value.
+      * @param enlisted Collection of keys enlisted into this transaction.
+      * @param drPutMap DR put map (optional).
+      * @param drRmvMap DR remove map (optional).
+      * @param skipStore Skip store flag.
+      * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+      * @param keepBinary Keep binary flag.
+      * @param dataCenterId Optional data center ID.
+      * @return Future for missing values loading.
+      */
+     private <K, V> IgniteInternalFuture<Void> enlistWrite(
+         final GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         Collection<?> keys,
+         @Nullable ExpiryPolicy expiryPlc,
+         @Nullable Map<?, ?> lookup,
+         @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
+         @Nullable Object[] invokeArgs,
+         final boolean retval,
+         boolean lockOnly,
+         final CacheEntryPredicate[] filter,
+         final GridCacheReturn ret,
+         Collection<KeyCacheObject> enlisted,
+         @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
+         @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
+         boolean skipStore,
+         final boolean singleRmv,
+         final boolean keepBinary,
++        final boolean recovery,
+         Byte dataCenterId
+     ) {
+         assert retval || invokeMap == null;
+ 
+         try {
 -            addActiveCache(cacheCtx);
++            addActiveCache(cacheCtx, recovery);
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture<>(e);
+         }
+ 
+         boolean rmv = lookup == null && invokeMap == null;
+ 
+         final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+         final boolean needVal = singleRmv || retval || hasFilters;
+         final boolean needReadVer = needVal && (serializable() && optimistic());
+ 
+         try {
+             // Set transform flag for transaction.
+             if (invokeMap != null)
+                 transform = true;
+ 
+             Set<KeyCacheObject> missedForLoad = null;
+ 
+             for (Object key : keys) {
+                 if (key == null) {
+                     rollback();
+ 
+                     throw new NullPointerException("Null key.");
+                 }
+ 
+                 Object val = rmv || lookup == null ? null : lookup.get(key);
+                 EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
+ 
+                 GridCacheVersion drVer;
+                 long drTtl;
+                 long drExpireTime;
+ 
+                 if (drPutMap != null) {
+                     GridCacheDrInfo info = drPutMap.get(key);
+ 
+                     assert info != null;
+ 
+                     drVer = info.version();
+                     drTtl = info.ttl();
+                     drExpireTime = info.expireTime();
+                 }
+                 else if (drRmvMap != null) {
+                     assert drRmvMap.get(key) != null;
+ 
+                     drVer = drRmvMap.get(key);
+                     drTtl = -1L;
+                     drExpireTime = -1L;
+                 }
+                 else if (dataCenterId != null) {
+                     drVer = cctx.versions().next(dataCenterId);
+                     drTtl = -1L;
+                     drExpireTime = -1L;
+                 }
+                 else {
+                     drVer = null;
+                     drTtl = -1L;
+                     drExpireTime = -1L;
+                 }
+ 
+                 if (!rmv && val == null && entryProcessor == null) {
+                     setRollbackOnly();
+ 
+                     throw new NullPointerException("Null value.");
+                 }
+ 
+                 KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+ 
+                 boolean loadMissed = enlistWriteEntry(cacheCtx,
+                     entryTopVer,
+                     cacheKey,
+                     val,
+                     entryProcessor,
+                     invokeArgs,
+                     expiryPlc,
+                     retval,
+                     lockOnly,
+                     filter,
+                     drVer,
+                     drTtl,
+                     drExpireTime,
+                     ret,
+                     enlisted,
+                     skipStore,
+                     singleRmv,
+                     hasFilters,
+                     needVal,
+                     needReadVer,
 -                    keepBinary);
++                    keepBinary,
++                    recovery);
+ 
+                 if (loadMissed) {
+                     if (missedForLoad == null)
+                         missedForLoad = new HashSet<>();
+ 
+                     missedForLoad.add(cacheKey);
+                 }
+             }
+ 
+             if (missedForLoad != null) {
+                 AffinityTopologyVersion topVer = topologyVersionSnapshot();
+ 
+                 if (topVer == null)
+                     topVer = entryTopVer;
+ 
+                 return loadMissing(cacheCtx,
+                     topVer != null ? topVer : topologyVersion(),
+                     missedForLoad,
+                     filter,
+                     ret,
+                     needReadVer,
+                     singleRmv,
+                     hasFilters,
+                     /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+                     retval,
+                     keepBinary,
++                    recovery,
+                     expiryPlc);
+             }
+ 
+             return new GridFinishedFuture<>();
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture<>(e);
+         }
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param cacheKey Key.
+      * @param val Value.
+      * @param entryProcessor Entry processor.
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param expiryPlc Explicitly specified expiry policy for entry.
+      * @param retval Return value flag.
+      * @param lockOnly Lock only flag.
+      * @param filter Filter.
+      * @param drVer DR version.
+      * @param drTtl DR ttl.
+      * @param drExpireTime DR expire time.
+      * @param ret Return value.
+      * @param enlisted Enlisted keys collection.
+      * @param skipStore Skip store flag.
+      * @param singleRmv {@code True} for single remove operation.
+      * @param hasFilters {@code True} if filters not empty.
+      * @param needVal {@code True} if value is needed.
+      * @param needReadVer {@code True} if need read entry version.
+      * @return {@code True} if entry value should be loaded.
+      * @throws IgniteCheckedException If failed.
+      */
+     private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         final KeyCacheObject cacheKey,
+         @Nullable final Object val,
+         @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
+         @Nullable final Object[] invokeArgs,
+         @Nullable final ExpiryPolicy expiryPlc,
+         final boolean retval,
+         final boolean lockOnly,
+         final CacheEntryPredicate[] filter,
+         final GridCacheVersion drVer,
+         final long drTtl,
+         long drExpireTime,
+         final GridCacheReturn ret,
+         @Nullable final Collection<KeyCacheObject> enlisted,
+         boolean skipStore,
+         boolean singleRmv,
+         boolean hasFilters,
+         final boolean needVal,
+         boolean needReadVer,
 -        boolean keepBinary
++        boolean keepBinary,
++        boolean recovery
+     ) throws IgniteCheckedException {
+         boolean loadMissed = false;
+ 
+         final boolean rmv = val == null && entryProcessor == null;
+ 
+         IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+ 
+         IgniteTxEntry txEntry = entry(txKey);
+ 
+         // First time access.
+         if (txEntry == null) {
+             while (true) {
+                 GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
+ 
+                 try {
+                     entry.unswap(false);
+ 
+                     // Check if lock is being explicitly acquired by the same thread.
+                     if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+                         entry.lockedByThread(threadId, xidVer)) {
+                         throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+                             "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
+                             ", entry=" + entry +
+                             ", xidVer=" + xidVer +
+                             ", threadId=" + threadId +
+                             ", locNodeId=" + cctx.localNodeId() + ']');
+                     }
+ 
+                     CacheObject old = null;
+                     GridCacheVersion readVer = null;
+ 
+                     if (optimistic() && !implicit()) {
+                         try {
+                             if (needReadVer) {
+                                 EntryGetResult res = primaryLocal(entry) ?
+                                     entry.innerGetVersioned(
+                                         null,
+                                         this,
 -                                        /*swap*/false,
 -                                        /*unmarshal*/retval || needVal,
+                                         /*metrics*/retval,
+                                         /*events*/retval,
+                                         CU.subjectId(this, cctx),
+                                         entryProcessor,
+                                         resolveTaskName(),
+                                         null,
+                                         keepBinary,
+                                         null) : null;
+ 
+                                 if (res != null) {
+                                     old = res.value();
+                                     readVer = res.version();
+                                 }
+                             }
+                             else {
+                                 old = entry.innerGet(
+                                     null,
+                                     this,
 -                                    /*swap*/false,
 -                                    /*read-through*/false,
+                                     /*metrics*/retval,
+                                     /*events*/retval,
+                                     /*temporary*/false,
+                                     CU.subjectId(this, cctx),
+                                     entryProcessor,
+                                     resolveTaskName(),
+                                     null,
+                                     keepBinary);
+                             }
+                         }
+                         catch (ClusterTopologyCheckedException e) {
+                             entry.context().evicts().touch(entry, topologyVersion());
+ 
+                             throw e;
+                         }
+                     }
+                     else
 -                        old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
++                        old = entry.rawGet();
+ 
+                     final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+                         entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+ 
+                     if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
+                         ret.set(cacheCtx, old, false, keepBinary);
+ 
+                         if (!readCommitted()) {
+                             if (optimistic() && serializable()) {
+                                 txEntry = addEntry(op,
+                                     old,
+                                     entryProcessor,
+                                     invokeArgs,
+                                     entry,
+                                     expiryPlc,
+                                     filter,
+                                     true,
+                                     drTtl,
+                                     drExpireTime,
+                                     drVer,
+                                     skipStore,
+                                     keepBinary);
+                             }
+                             else {
+                                 txEntry = addEntry(READ,
+                                     old,
+                                     null,
+                                     null,
+                                     entry,
+                                     null,
+                                     CU.empty0(),
+                                     false,
+                                     -1L,
+                                     -1L,
+                                     null,
+                                     skipStore,
+                                     keepBinary);
+                             }
+ 
+                             txEntry.markValid();
+ 
+                             if (needReadVer) {
+                                 assert readVer != null;
+ 
+                                 txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                             }
+                         }
+ 
+                         if (readCommitted())
+                             cacheCtx.evicts().touch(entry, topologyVersion());
+ 
+                         break; // While.
+                     }
+ 
+                     txEntry = addEntry(op,
+                         cacheCtx.toCacheObject(val),
+                         entryProcessor,
+                         invokeArgs,
+                         entry,
+                         expiryPlc,
+                         filter,
+                         true,
+                         drTtl,
+                         drExpireTime,
+                         drVer,
+                         skipStore,
+                         keepBinary);
+ 
 -                    if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
++                    if (!implicit() && readCommitted())
+                         cacheCtx.evicts().touch(entry, topologyVersion());
+ 
+                     if (enlisted != null)
+                         enlisted.add(cacheKey);
+ 
+                     if (!pessimistic() && !implicit()) {
+                         txEntry.markValid();
+ 
+                         if (old == null) {
+                             if (needVal)
+                                 loadMissed = true;
+                             else {
+                                 assert !implicit() || !transform : this;
+                                 assert txEntry.op() != TRANSFORM : txEntry;
+ 
+                                 if (retval)
+                                     ret.set(cacheCtx, null, true, keepBinary);
+                                 else
+                                     ret.success(true);
+                             }
+                         }
+                         else {
+                             if (needReadVer) {
+                                 assert readVer != null;
+ 
+                                 txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                             }
+ 
+                             if (retval && !transform)
+                                 ret.set(cacheCtx, old, true, keepBinary);
+                             else {
+                                 if (txEntry.op() == TRANSFORM) {
+                                     GridCacheVersion ver;
+ 
+                                     try {
+                                         ver = entry.version();
+                                     }
+                                     catch (GridCacheEntryRemovedException ex) {
+                                         assert optimistic() : txEntry;
+ 
+                                         if (log.isDebugEnabled())
+                                             log.debug("Failed to get entry version " +
+                                                 "[err=" + ex.getMessage() + ']');
+ 
+                                         ver = null;
+                                     }
+ 
+                                     addInvokeResult(txEntry, old, ret, ver);
+                                 }
+                                 else
+                                     ret.success(true);
+                             }
+                         }
+                     }
+                     // Pessimistic.
+                     else {
+                         if (retval && !transform)
+                             ret.set(cacheCtx, old, true, keepBinary);
+                         else
+                             ret.success(true);
+                     }
+ 
+                     break; // While.
+                 }
+                 catch (GridCacheEntryRemovedException ignore) {
+                     if (log.isDebugEnabled())
+                         log.debug("Got removed entry in transaction putAll0 method: " + entry);
+                 }
+             }
+         }
+         else {
+             if (entryProcessor == null && txEntry.op() == TRANSFORM)
+                 throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+                     "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
+ 
+             GridCacheEntryEx entry = txEntry.cached();
+ 
+             CacheObject v = txEntry.value();
+ 
+             boolean del = txEntry.op() == DELETE && rmv;
+ 
+             if (!del) {
+                 if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
+                     ret.set(cacheCtx, v, false, keepBinary);
+ 
+                     return loadMissed;
+                 }
+ 
+                 GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+                     v != null ? UPDATE : CREATE;
+ 
+                 txEntry = addEntry(op,
+                     cacheCtx.toCacheObject(val),
+                     entryProcessor,
+                     invokeArgs,
+                     entry,
+                     expiryPlc,
+                     filter,
+                     true,
+                     drTtl,
+                     drExpireTime,
+                     drVer,
+                     skipStore,
+                     keepBinary);
+ 
+                 if (enlisted != null)
+                     enlisted.add(cacheKey);
+ 
+                 if (txEntry.op() == TRANSFORM) {
+                     GridCacheVersion ver;
+ 
+                     try {
+                         ver = entry.version();
+                     }
+                     catch (GridCacheEntryRemovedException e) {
+                         assert optimistic() : txEntry;
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+ 
+                         ver = null;
+                     }
+ 
+                     addInvokeResult(txEntry, txEntry.value(), ret, ver);
+                 }
+             }
+ 
+             if (!pessimistic()) {
+                 txEntry.markValid();
+ 
+                 if (retval && !transform)
+                     ret.set(cacheCtx, v, true, keepBinary);
+                 else
+                     ret.success(true);
+             }
+         }
+ 
+         return loadMissed;
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param keys Keys to remove.
+      * @param drMap DR map.
+      * @param retval Flag indicating whether a value should be returned.
+      * @param filter Filter.
+      * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+      * @return Future for asynchronous remove.
+      */
+     @SuppressWarnings("unchecked")
+     private <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync0(
+         final GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         @Nullable final Collection<? extends K> keys,
+         @Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
+         final boolean retval,
+         @Nullable final CacheEntryPredicate filter,
+         boolean singleRmv) {
+         try {
+             checkUpdatesAllowed(cacheCtx);
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture(e);
+         }
+ 
+         cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
+ 
+         if (retval)
+             needReturnValue(true);
+ 
+         final Collection<?> keys0;
+ 
+         if (drMap != null) {
+             assert keys == null;
+ 
+             keys0 = drMap.keySet();
+         }
+         else
+             keys0 = keys;
+ 
+         CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+ 
+         final Byte dataCenterId;
+ 
+         if (opCtx != null && opCtx.hasDataCenterId()) {
+             assert drMap == null : drMap;
+ 
+             dataCenterId = opCtx.dataCenterId();
+         }
+         else
+             dataCenterId = null;
+ 
+         assert keys0 != null;
+ 
+         if (log.isDebugEnabled())
+             log.debug(S.toString("Called removeAllAsync(...)",
+                 "tx", this, false,
+                 "keys", keys0, true,
+                 "implicit", implicit, false,
+                 "retval", retval, false));
+ 
+         try {
+             checkValid();
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture<>(e);
+         }
+ 
+         final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+ 
+         if (F.isEmpty(keys0)) {
+             if (implicit()) {
+                 try {
+                     commit();
+                 }
+                 catch (IgniteCheckedException e) {
+                     return new GridFinishedFuture<>(e);
+                 }
+             }
+ 
+             return new GridFinishedFuture<>(ret.success(true));
+         }
+ 
+         init();
+ 
+         final Collection<KeyCacheObject> enlisted = new ArrayList<>();
+ 
+         ExpiryPolicy plc;
+ 
+         final CacheEntryPredicate[] filters = CU.filterArray(filter);
+ 
+         if (!F.isEmpty(filters))
+             plc = opCtx != null ? opCtx.expiry() : null;
+         else
+             plc = null;
+ 
+         final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+ 
+         final IgniteInternalFuture<Void> loadFut = enlistWrite(
+             cacheCtx,
+             entryTopVer,
+             keys0,
+             plc,
+             /*lookup map*/null,
+             /*invoke map*/null,
+             /*invoke arguments*/null,
+             retval,
+             /*lock only*/false,
+             filters,
+             ret,
+             enlisted,
+             null,
+             drMap,
+             opCtx != null && opCtx.skipStore(),
+             singleRmv,
+             keepBinary,
++            opCtx != null && opCtx.recovery(),
+             dataCenterId
+         );
+ 
+         if (log.isDebugEnabled())
+             log.debug("Remove keys: " + enlisted);
+ 
+         // Acquire locks only after having added operation to the write set.
+         // Otherwise, during rollback we will not know whether locks need
+         // to be rolled back.
+         if (pessimistic()) {
+             assert loadFut == null || loadFut.isDone() : loadFut;
+ 
+             if (loadFut != null) {
+                 try {
+                     loadFut.get();
+                 }
+                 catch (IgniteCheckedException e) {
+                     return new GridFinishedFuture<>(e);
+                 }
+             }
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
+ 
+             long timeout = remainingTime();
+ 
+             if (timeout == -1)
+                 return new GridFinishedFuture<>(timeoutException());
+ 
+             IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+                 timeout,
+                 this,
+                 false,
+                 retval,
+                 isolation,
+                 isInvalidate(),
+                 -1L,
+                 -1L);
+ 
+             PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                 @Override protected GridCacheReturn postLock(GridCacheReturn ret)
+                     throws IgniteCheckedException
+                 {
+                     if (log.isDebugEnabled())
+                         log.debug("Acquired transaction lock for remove on keys: " + enlisted);
+ 
+                     postLockWrite(cacheCtx,
+                         enlisted,
+                         ret,
+                         /*remove*/true,
+                         retval,
+                         /*read*/false,
+                         -1L,
+                         filters,
+                         /*computeInvoke*/false);
+ 
+                     return ret;
+                 }
+             };
+ 
+             if (fut.isDone()) {
+                 try {
+                     return nonInterruptable(plc1.apply(fut.get(), null));
+                 }
+                 catch (GridClosureException e) {
+                     return new GridFinishedFuture<>(e.unwrap());
+                 }
+                 catch (IgniteCheckedException e) {
+                     try {
+                         return nonInterruptable(plc1.apply(false, e));
+                     }
+                     catch (Exception e1) {
+                         return new GridFinishedFuture<>(e1);
+                     }
+                 }
+             }
+             else
+                 return nonInterruptable(new GridEmbeddedFuture<>(
+                     fut,
+                     plc1
+                 ));
+         }
+         else {
+             if (implicit()) {
+                 // Should never load missing values for implicit transaction as values will be returned
+                 // with prepare response, if required.
+                 assert loadFut.isDone();
+ 
+                 return nonInterruptable(commitNearTxLocalAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+                     @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+                         throws IgniteCheckedException {
+                         try {
+                             txFut.get();
+ 
+                             return new GridCacheReturn(cacheCtx, true, keepBinary,
+                                 implicitRes.value(), implicitRes.success());
+                         }
+                         catch (IgniteCheckedException | RuntimeException e) {
+                             rollbackNearTxLocalAsync();
+ 
+                             throw e;
+                         }
+                     }
+                 }));
+             }
+             else {
+                 return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+                     @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f)
+                         throws IgniteCheckedException {
+                         f.get();
+ 
+                         return ret;
+                     }
+                 }));
+             }
+         }
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param keys Keys to get.
+      * @param deserializeBinary Deserialize binary flag.
+      * @param skipVals Skip values flag.
+      * @param keepCacheObjects Keep cache objects
+      * @param skipStore Skip store flag.
+      * @return Future for this get.
+      */
+     @SuppressWarnings("unchecked")
+     public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
+         final GridCacheContext cacheCtx,
+         @Nullable final AffinityTopologyVersion entryTopVer,
+         Collection<KeyCacheObject> keys,
+         final boolean deserializeBinary,
+         final boolean skipVals,
+         final boolean keepCacheObjects,
+         final boolean skipStore,
++        final boolean recovery,
+         final boolean needVer) {
+         if (F.isEmpty(keys))
+             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+ 
+         init();
+ 
+         int keysCnt = keys.size();
+ 
+         boolean single = keysCnt == 1;
+ 
+         try {
+             checkValid();
+ 
+             final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
+ 
+             final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+ 
+             CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+ 
+             ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
+ 
+             final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
+                 entryTopVer,
+                 keys,
+                 expiryPlc,
+                 retMap,
+                 missed,
+                 keysCnt,
+                 deserializeBinary,
+                 skipVals,
+                 keepCacheObjects,
+                 skipStore,
++                recovery,
+                 needVer);
+ 
+             if (single && missed.isEmpty())
+                 return new GridFinishedFuture<>(retMap);
+ 
+             // Handle locks.
+             if (pessimistic() && !readCommitted() && !skipVals) {
+                 if (expiryPlc == null)
+                     expiryPlc = cacheCtx.expiry();
+ 
+                 long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
+                 long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED;
+ 
+                 long timeout = remainingTime();
+ 
+                 if (timeout == -1)
+                     return new GridFinishedFuture<>(timeoutException());
+ 
+                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
+                     timeout,
+                     this,
+                     true,
+                     true,
+                     isolation,
+                     isInvalidate(),
+                     createTtl,
+                     accessTtl);
+ 
+                 final ExpiryPolicy expiryPlc0 = expiryPlc;
+ 
+                 PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
+                     @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
+                         if (log.isDebugEnabled())
+                             log.debug("Acquired transaction lock for read on keys: " + lockKeys);
+ 
+                         // Load keys only after the locks have been acquired.
+                         for (KeyCacheObject cacheKey : lockKeys) {
+                             K keyVal = (K)
+                                 (keepCacheObjects ? cacheKey :
+                                     cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
+ 
+                             if (retMap.containsKey(keyVal))
+                                 // We already have a return value.
+                                 continue;
+ 
+                             IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+ 
+                             IgniteTxEntry txEntry = entry(txKey);
+ 
+                             assert txEntry != null;
+ 
+                             // Check if there is cached value.
+                             while (true) {
+                                 GridCacheEntryEx cached = txEntry.cached();
+ 
+                                 CacheObject val = null;
+                                 GridCacheVersion readVer = null;
+                                 EntryGetResult getRes = null;
+ 
+                                 try {
+                                     Object transformClo =
+                                         (!F.isEmpty(txEntry.entryProcessors()) &&
+                                             cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+                                             F.first(txEntry.entryProcessors()) : null;
+ 
+                                     if (needVer) {
+                                         getRes = cached.innerGetVersioned(
+                                             null,
+                                             GridNearTxLocal.this,
 -                                            /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
 -                                            /*unmarshal*/true,
+                                             /*update-metrics*/true,
+                                             /*event*/!skipVals,
+                                             CU.subjectId(GridNearTxLocal.this, cctx),
+                                             transformClo,
+                                             resolveTaskName(),
+                                             null,
+                                             txEntry.keepBinary(),
+                                             null);
+ 
+                                         if (getRes != null) {
+                                             val = getRes.value();
+                                             readVer = getRes.version();
+                                         }
+                                     }
+                                     else{
+                                         val = cached.innerGet(
+                                             null,
+                                             GridNearTxLocal.this,
 -                                            cacheCtx.isSwapOrOffheapEnabled(),
 -                                            /*read-through*/false,
+                                             /*metrics*/true,
+                                             /*events*/!skipVals,
+                                             /*temporary*/false,
+                                             CU.subjectId(GridNearTxLocal.this, cctx),
+                                             transformClo,
+                                             resolveTaskName(),
+                                             null,
+                                             txEntry.keepBinary());
+                                     }
+ 
+                                     // If value is in cache and passed the filter.
+                                     if (val != null) {
+                                         missed.remove(cacheKey);
+ 
+                                         txEntry.setAndMarkValid(val);
+ 
+                                         if (!F.isEmpty(txEntry.entryProcessors()))
+                                             val = txEntry.applyEntryProcessors(val);
+ 
+                                         cacheCtx.addResult(retMap,
+                                             cacheKey,
+                                             val,
+                                             skipVals,
+                                             keepCacheObjects,
+                                             deserializeBinary,
+                                             false,
+                                             getRes,
+                                             readVer,
+                                             0,
+                                             0,
+                                             needVer);
+ 
+                                         if (readVer != null)
+                                             txEntry.entryReadVersion(readVer);
+                                     }
+ 
+                                     // Even though we bring the value back from lock acquisition,
+                                     // we still need to recheck primary node for consistent values
+                                     // in case of concurrent transactional locks.
+ 
+                                     break; // While.
+                                 }
+                                 catch (GridCacheEntryRemovedException ignore) {
+                                     if (log.isDebugEnabled())
+                                         log.debug("Got removed exception in get postLock (will retry): " +
+                                             cached);
+ 
+                                     txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
+                                 }
+                             }
+                         }
+ 
+                         if (!missed.isEmpty() && cacheCtx.isLocal()) {
+                             AffinityTopologyVersion topVer = topologyVersionSnapshot();
+ 
+                             if (topVer == null)
+                                 topVer = entryTopVer;
+ 
+                             return checkMissed(cacheCtx,
+                                 topVer != null ? topVer : topologyVersion(),
+                                 retMap,
+                                 missed,
+                                 deserializeBinary,
+                                 skipVals,
+                                 keepCacheObjects,
+                                 skipStore,
++                                recovery,
+                                 needVer,
+                                 expiryPlc0);
+                         }
+ 
+                         return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+                     }
+                 };
+ 
+                 FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
+                     @Override Map<K, V> finish(Map<K, V> loaded) {
+                         retMap.putAll(loaded);
+ 
+                         return retMap;
+                     }
+                 };
+ 
+                 if (fut.isDone()) {
+                     try {
+                         IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
+ 
+                         return fut1.isDone() ?
+                             new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
+                             new GridEmbeddedFuture<>(finClos, fut1);
+                     }
+                     catch (GridClosureException e) {
+                         return new GridFinishedFuture<>(e.unwrap());
+                     }
+                     catch (IgniteCheckedException e) {
+                         try {
+                             return plc2.apply(false, e);
+                         }
+                         catch (Exception e1) {
+                             return new GridFinishedFuture<>(e1);
+                         }
+                     }
+                 }
+                 else {
+                     return new GridEmbeddedFuture<>(
+                         fut,
+                         plc2,
+                         finClos);
+                 }
+             }
+             else {
+                 assert optimistic() || readCommitted() || skipVals;
+ 
+                 if (!missed.isEmpty()) {
+                     if (!readCommitted())
+                         for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
+                             KeyCacheObject cacheKey = it.next();
+ 
+                             K keyVal =
+                                 (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false));
+ 
+                             if (retMap.containsKey(keyVal))
+                                 it.remove();
+                         }
+ 
+                     if (missed.isEmpty())
+                         return new GridFinishedFuture<>(retMap);
+ 
+                     AffinityTopologyVersion topVer = topologyVersionSnapshot();
+ 
+                     if (topVer == null)
+                         topVer = entryTopVer;
+ 
+                     return checkMissed(cacheCtx,
+                         topVer != null ? topVer : topologyVersion(),
+                         retMap,
+                         missed,
+                         deserializeBinary,
+                         skipVals,
+                         keepCacheObjects,
+                         skipStore,
++                        recovery,
+                         needVer,
+                         expiryPlc);
+                 }
+ 
+                 return new GridFinishedFuture<>(retMap);
+             }
+         }
+         catch (IgniteCheckedException e) {
+             setRollbackOnly();
+ 
+             return new GridFinishedFuture<>(e);
+         }
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param keys Key to enlist.
+      * @param expiryPlc Explicitly specified expiry policy for entry.
+      * @param map Return map.
+      * @param missed Map of missed keys.
+      * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
+      * @param deserializeBinary Deserialize binary flag.
+      * @param skipVals Skip values flag.
+      * @param keepCacheObjects Keep cache objects flag.
+      * @param skipStore Skip store flag.
+      * @throws IgniteCheckedException If failed.
+      * @return Enlisted keys.
+      */
+     @SuppressWarnings({"RedundantTypeArguments"})
+     private <K, V> Collection<KeyCacheObject> enlistRead(
+         final GridCacheContext cacheCtx,
+         @Nullable AffinityTopologyVersion entryTopVer,
+         Collection<KeyCacheObject> keys,
+         @Nullable ExpiryPolicy expiryPlc,
+         Map<K, V> map,
+         Map<KeyCacheObject, GridCacheVersion> missed,
+         int keysCnt,
+         boolean deserializeBinary,
+         boolean skipVals,
+         boolean keepCacheObjects,
+         boolean skipStore,
++        boolean recovery,
+         final boolean needVer
+     ) throws IgniteCheckedException {
+         assert !F.isEmpty(keys);
+         assert keysCnt == keys.size();
+ 
+         cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
+ 
+         boolean single = keysCnt == 1;
+ 
+         Collection<KeyCacheObject> lockKeys = null;
+ 
+         AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
+ 
+         boolean needReadVer = (serializable() && optimistic()) || needVer;
+ 
+         // In this loop we cover only read-committed or optimistic transactions.
+         // Transactions that are pessimistic and not read-committed are covered
+         // outside of this loop.
+         for (KeyCacheObject key : keys) {
+             if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
 -                addActiveCache(cacheCtx);
++                addActiveCache(cacheCtx, recovery);
+ 
+             IgniteTxKey txKey = cacheCtx.txKey(key);
+ 
+             // Check write map (always check writes first).
+             IgniteTxEntry txEntry = entry(txKey);
+ 
+             // Either non-read-committed or there was a previous write.
+             if (txEntry != null) {
+                 CacheObject val = txEntry.value();
+ 
+                 if (txEntry.hasValue()) {
+                     if (!F.isEmpty(txEntry.entryProcessors()))
+                         val = txEntry.applyEntryProcessors(val);
+ 
+                     if (val != null) {
+                         GridCacheVersion ver = null;
+ 
+                         if (needVer) {
+                             if (txEntry.op() != READ)
+                                 ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+                             else {
+                                 ver = txEntry.entryReadVersion();
+ 
+                                 if (ver == null && pessimistic()) {
+                                     while (true) {
+                                         try {
+                                             GridCacheEntryEx cached = txEntry.cached();
+ 
+                                             ver = cached.isNear() ?
+                                                 ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+ 
+                                             break;
+                                         }
+                                         catch (GridCacheEntryRemovedException ignored) {
+                                             txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+                                         }
+                                     }
+                                 }
+ 
+                                 if (ver == null) {
+                                     assert optimistic() && repeatableRead() : this;
+ 
+                                     ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+                                 }
+                             }
+ 
+                             assert ver != null;
+                         }
+ 
+                         cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
+                             ver, 0, 0);
+                     }
+                 }
+                 else {
+                     assert txEntry.op() == TRANSFORM;
+ 
+                     while (true) {
+                         try {
+                             GridCacheVersion readVer = null;
+                             EntryGetResult getRes = null;
+ 
+                             Object transformClo =
+                                 (txEntry.op() == TRANSFORM &&
+                                     cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+                                     F.first(txEntry.entryProcessors()) : null;
+ 
+                             if (needVer) {
+                                 getRes = txEntry.cached().innerGetVersioned(
+                                     null,
+                                     this,
 -                                    /*swap*/true,
 -                                    /*unmarshal*/true,
+                                     /*update-metrics*/true,
+                                     /*event*/!skipVals,
+                                     CU.subjectId(this, cctx),
+                                     transformClo,
+                                     resolveTaskName(),
+                                     null,
+                                     txEntry.keepBinary(),
+                                     null);
+ 
+                                 if (getRes != null) {
+                                     val = getRes.value();
+                                     readVer = getRes.version();
+                                 }
+                             }
+                             else {
+                                 val = txEntry.cached().innerGet(
+                                     null,
+                                     this,
 -                                    /*swap*/true,
 -                                    /*read-through*/false,
+                                     /*metrics*/true,
+                                     /*event*/!skipVals,
+                                     /*temporary*/false,
+                                     CU.subjectId(this, cctx),
+                                     transformClo,
+                                     resolveTaskName(),
+                                     null,
+                                     txEntry.keepBinary());
+                             }
+ 
+                             if (val != null) {
+                                 if (!readCommitted() && !skipVals)
+                                     txEntry.readValue(val);
+ 
+                                 if (!F.isEmpty(txEntry.entryProcessors()))
+                                     val = txEntry.applyEntryProcessors(val);
+ 
+                                 cacheCtx.addResult(map,
+                                     key,
+                                     val,
+                                     skipVals,
+                                     keepCacheObjects,
+                                     deserializeBinary,
+                                     false,
+                                     getRes,
+                                     readVer,
+                                     0,
+                                     0,
+                                     needVer);
+                             }
+                             else
+                                 missed.put(key, txEntry.cached().version());
+ 
+                             break;
+                         }
+                         catch (GridCacheEntryRemovedException ignored) {
+                             txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+                         }
+                     }
+                 }
+             }
+             // First time access within transaction.
+             else {
+                 if (lockKeys == null && !skipVals)
+                     lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
+ 
+                 if (!single && !skipVals)
+                     lockKeys.add(key);
+ 
+                 while (true) {
+                     GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+ 
+                     try {
+                         GridCacheVersion ver = entry.version();
+ 
+                         CacheObject val = null;
+                         GridCacheVersion readVer = null;
+                         EntryGetResult getRes = null;
+ 
+                         if (!pessimistic() || readCommitted() && !skipVals) {
+                             IgniteCacheExpiryPolicy accessPlc =
+                                 optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
+ 
+                             if (needReadVer) {
+                                 getRes = primaryLocal(entry) ?
+                                     entry.innerGetVersioned(
+                                         null,
+                                         this,
 -                                        /*swap*/true,
 -                                        /*unmarshal*/true,
+                                         /*metrics*/true,
+                                         /*event*/true,
+                                         CU.subjectId(this, cctx),
+                                         null,
+                                         resolveTaskName(),
+                                         accessPlc,
+                                         !deserializeBinary,
+                                         null) : null;
+ 
+                                 if (getRes != null) {
+                                     val = getRes.value();
+                                     readVer = getRes.version();
+                                 }
+                             }
+                             else {
+                                 val = entry.innerGet(
+                                     null,
+                                     this,
 -                                    /*swap*/true,
 -                                    /*read-through*/false,
+                                     /*metrics*/true,
+                                     /*event*/true,
+                                     /*temporary*/false,
+                                     CU.subjectId(this, cctx),
+                                     null,
+                                     resolveTaskName(),
+                                     accessPlc,
+                                     !deserializeBinary);
+                             }
+ 
+                             if (val != null) {
+                                 cacheCtx.addResult(map,
+                                     key,
+                                     val,
+                                     skipVals,
+                                     keepCacheObjects,
+                                     deserializeBinary,
+                                     false,
+                                     getRes,
+                                     readVer,
+                                     0,
+                                     0,
+                                     needVer);
+                             }
+                             else
+                                 missed.put(key, ver);
+                         }
+                         else
+                             // We must wait for the lock in pessimistic mode.
+                             missed.put(key, ver);
+ 
+                         if (!readCommitted() && !skipVals) {
+                             txEntry = addEntry(READ,
+                                 val,
+                                 null,
+                                 null,
+                                 entry,
+                                 expiryPlc,
+                                 null,
+                                 true,
+                                 -1L,
+                                 -1L,
+                                 null,
+                                 skipStore,
+                                 !deserializeBinary);
+ 
+                             // As optimization, mark as checked immediately
+                             // for non-pessimistic if value is not null.
+                             if (val != null && !pessimistic()) {
+                                 txEntry.markValid();
+ 
+                                 if (needReadVer) {
+                                     assert readVer != null;
+ 
+                                     txEntry.entryReadVersion(readVer);
+                                 }
+                             }
+                         }
+ 
+                         break; // While.
+                     }
+                     catch (GridCacheEntryRemovedException ignored) {
+                         if (log.isDebugEnabled())
+                             log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+                     }
+                     finally {
+                         if (entry != null && readCommitted()) {
+                             if (cacheCtx.isNear()) {
+                                 if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
+                                     if (entry.markObsolete(xidVer))
+                                         cacheCtx.cache().removeEntry(entry);
+                                 }
+                             }
+                             else
+                                 entry.context().evicts().touch(entry, topVer);
+                         }
+                     }
+                 }
+             }
+         }
+ 
+         return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param keys Keys to load.
+      * @param filter Filter.
+      * @param ret Return value.
+      * @param needReadVer Read version flag.
+      * @param singleRmv {@code True} for single remove operation.
+      * @param hasFilters {@code True} if filters not empty.
+      * @param readThrough Read through flag.
+      * @param retval Return value flag.
+      * @param expiryPlc Expiry policy.
+      * @return Load future.
+      */
+     private IgniteInternalFuture<Void> loadMissing(
+         final GridCacheContext cacheCtx,
+         final AffinityTopologyVersion topVer,
+         final Set<KeyCacheObject> keys,
+         final CacheEntryPredicate[] filter,
+         final GridCacheReturn ret,
+         final boolean needReadVer,
+         final boolean singleRmv,
+         final boolean hasFilters,
+         final boolean readThrough,
+         final boolean retval,
+         final boolean keepBinary,
++        final boolean recovery,
+         final ExpiryPolicy expiryPlc) {
+         GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
+             new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+                 @Override public void apply(KeyCacheObject key,
+                                             @Nullable Object val,
+                                             @Nullable GridCacheVersion loadVer) {
+                     if (log.isDebugEnabled())
+                         log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+ 
+                     IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
+ 
+                     assert e != null;
+ 
+                     if (needReadVer) {
+                         assert loadVer != null;
+ 
+                         e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+                     }
+ 
+                     if (singleRmv) {
+                         assert !hasFilters && !retval;
+                         assert val == null || Boolean.TRUE.equals(val) : val;
+ 
+                         ret.set(cacheCtx, null, val != null, keepBinary);
+                     }
+                     else {
+                         CacheObject cacheVal = cacheCtx.toCacheObject(val);
+ 
+                         if (e.op() == TRANSFORM) {
+                             GridCacheVersion ver;
+ 
+                             e.readValue(cacheVal);
+ 
+                             try {
+                                 ver = e.cached().version();
+                             }
+                             catch (GridCacheEntryRemovedException ex) {
+                                 assert optimistic() : e;
+ 
+                                 if (log.isDebugEnabled())
+                                     log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+ 
+                                 ver = null;
+                             }
+ 
+                             addInvokeResult(e, cacheVal, ret, ver);
+                         }
+                         else {
+                             boolean success;
+ 
+                             if (hasFilters) {
+                                 success = isAll(e.context(), key, cacheVal, filter);
+ 
+                                 if (!success)
+                                     e.value(cacheVal, false, false);
+                             }
+                             else
+                                 success = true;
+ 
+                             ret.set(cacheCtx, cacheVal, success, keepBinary);
+                         }
+                     }
+                 }
+             };
+ 
+         return loadMissing(
+             cacheCtx,
+             topVer,
+             readThrough,
+             /*async*/true,
+             keys,
+             /*skipVals*/singleRmv,
+             needReadVer,
+             keepBinary,
++            recovery,
+             expiryPlc,
+             c);
+     }
+ 
+     /**
+      * @param cacheCtx Cache context.
+      * @param loadFut Missing keys load future.
+      * @param ret Future result.
+      * @param keepBinary Keep binary flag.
+      * @return Future.
       */
-     public boolean colocatedLocallyMapped() {
-         return colocatedLocallyMapped;
+     private IgniteInternalFuture optimisticPutFuture(
+         final GridCacheContext cacheCtx,
+         IgniteInternalFuture<Void> loadFut,
+         final GridCacheReturn ret,
+         final boolean keepBinary
+     ) {
+         if (implicit()) {
+             // Should never load missing values for implicit transaction as values will be returned
+             // with prepare response, if required.
+             assert loadFut.isDone();
+ 
+             try {
+                 loadFut.get();
+             }
+             catch (IgniteCheckedException e) {
+                 return new GridFinishedFuture<>(e);
+             }
+ 
+             return nonInterruptable(commitNearTxLocalAsync().chain(
+                 new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+                     @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+                         throws IgniteCheckedException {
+                         try {
+                             txFut.get();
+ 
+                             Object res = implicitRes.value();
+ 
+                             if (implicitRes.invokeResult()) {
+                                 assert res == null || res instanceof Map : implicitRes;
+ 
+                                 res = cacheCtx.unwrapInvokeResult((Map)res, keepBinary);
+                             }
+ 
+                             return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
+                         }
+                         catch (IgniteCheckedException | RuntimeException e) {
+                             if (!(e instanceof NodeStoppingException))
+                                 rollbackNearTxLocalAsync();
+ 
+                             throw e;
+                         }
+                     }
+                 }
+             ));
+         }
+         else {
+             return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+                 @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) throws IgniteCheckedException {
+                     f.get();
+ 
+                     return ret;
+                 }
+             }));
+         }
      }
  
      /**
@@@ -449,6 -2571,152 +2577,151 @@@
      }
  
      /**
+      * @param cacheCtx  Cache context.
+      * @param readThrough Read through flag.
+      * @param async if {@code True}, then loading will happen in a separate thread.
+      * @param keys Keys.
+      * @param skipVals Skip values flag.
+      * @param needVer If {@code true} version is required for loaded values.
+      * @param c Closure to be applied for loaded values.
+      * @param expiryPlc Expiry policy.
+      * @return Future with {@code True} value if loading took place.
+      */
+     private IgniteInternalFuture<Void> localCacheLoadMissing(
+         final GridCacheContext cacheCtx,
+         final AffinityTopologyVersion topVer,
+         final boolean readThrough,
+         boolean async,
+         final Collection<KeyCacheObject> keys,
+         boolean skipVals,
+         boolean needVer,
+         boolean keepBinary,
++        boolean recovery,
+         final ExpiryPolicy expiryPlc,
+         final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
+     ) {
+         assert cacheCtx.isLocal() : cacheCtx.name();
+ 
+         if (!readThrough || !cacheCtx.readThrough()) {
+             for (KeyCacheObject key : keys)
+                 c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
+ 
+             return new GridFinishedFuture<>();
+         }
+ 
+         try {
+             IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ?
+                 accessPolicy(cacheCtx, keys) :
+                 cacheCtx.cache().expiryPolicy(expiryPlc);
+ 
+             Map<KeyCacheObject, GridCacheVersion> misses = null;
+ 
+             for (KeyCacheObject key : keys) {
+                 while (true) {
+                     IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
+ 
+                     GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) :
+                         txEntry.cached();
+ 
+                     if (entry == null)
+                         continue;
+ 
+                     try {
+                         EntryGetResult res = entry.innerGetVersioned(
+                             null,
+                             this,
 -                            /*readSwap*/true,
 -                            /*unmarshal*/true,
+                             /*update-metrics*/!skipVals,
+                             /*event*/!skipVals,
+                             CU.subjectId(this, cctx),
+                             null,
+                             resolveTaskName(),
+                             expiryPlc0,
+                             txEntry == null ? keepBinary : txEntry.keepBinary(),
+                             null);
+ 
+                         if (res == null) {
+                             if (misses == null)
+                                 misses = new LinkedHashMap<>();
+ 
+                             misses.put(key, entry.version());
+                         }
+                         else
+                             c.apply(key, skipVals ? true : res.value(), res.version());
+ 
+                         break;
+                     }
+                     catch (GridCacheEntryRemovedException ignore) {
+                         if (log.isDebugEnabled())
+                             log.debug("Got removed entry, will retry: " + key);
+ 
+                         if (txEntry != null)
+                             txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion()));
+                     }
+                 }
+             }
+ 
+             if (misses != null) {
+                 final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
+ 
+                 cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() {
+                     @Override public void apply(KeyCacheObject key, Object val) {
+                         GridCacheVersion ver = misses0.remove(key);
+ 
+                         assert ver != null : key;
+ 
+                         if (val != null) {
+                             CacheObject cacheVal = cacheCtx.toCacheObject(val);
+ 
+                             while (true) {
+                                 GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
+ 
+                                 try {
+                                     EntryGetResult verVal = entry.versionedValue(cacheVal,
+                                         ver,
+                                         null,
+                                         null,
+                                         null

<TRUNCATED>
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------