You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/24 14:12:34 UTC
[44/50] [abbrv] ignite git commit: Merge branch master ignite-2.0 to
ignite-3477
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 6bf96b0,81606d4..fe8d654
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -284,10 -326,2090 +326,2091 @@@ public class GridNearTxLocal extends Gr
}
/**
- * @return {@code True} if transaction contains colocated key mapped to the local node.
+ * @return {@code True} if transaction contains colocated key mapped to the local node.
+ */
+ public boolean colocatedLocallyMapped() {
+ return colocatedLocallyMapped;
+ }
+
+ /**
+ * @param colocatedLocallyMapped {@code True} if transaction contains colocated key mapped to the local node.
+ */
+ public void colocatedLocallyMapped(boolean colocatedLocallyMapped) {
+ this.colocatedLocallyMapped = colocatedLocallyMapped;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) {
+ return entry.detached() || super.ownsLockUnsafe(entry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException {
+ return entry.detached() || super.ownsLock(entry);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param map Map to put.
+ * @param retval Flag indicating whether a value should be returned.
+ * @return Future for put operation.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Map<? extends K, ? extends V> map,
+ boolean retval
+ ) {
+ return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+ entryTopVer,
+ map,
+ null,
+ null,
+ null,
+ retval);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @return Future for put operation.
+ */
+ public final <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ V val,
+ boolean retval,
+ CacheEntryPredicate filter) {
+ return putAsync0(cacheCtx,
+ entryTopVer,
+ key,
+ val,
+ null,
+ null,
+ retval,
+ filter);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @return Operation future.
+ */
+ public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ EntryProcessor<K, V, Object> entryProcessor,
+ Object... invokeArgs) {
+ return (IgniteInternalFuture)putAsync0(cacheCtx,
+ entryTopVer,
+ key,
+ null,
+ entryProcessor,
+ invokeArgs,
+ true,
+ null);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param map Entry processors map.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @return Operation future.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
+ Object... invokeArgs
+ ) {
+ return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+ entryTopVer,
+ null,
+ map,
+ invokeArgs,
+ null,
+ true);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param drMap DR map to put.
+ * @return Future for DR put operation.
+ */
+ public IgniteInternalFuture<?> putAllDrAsync(
+ GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheDrInfo> drMap
+ ) {
+ Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+ @Override public Object apply(GridCacheDrInfo val) {
+ return val.value();
+ }
+ });
+
+ return this.<Object, Object>putAllAsync0(cacheCtx,
+ null,
+ map,
+ null,
+ null,
+ drMap,
+ false);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param drMap DR map.
+ * @return Future for asynchronous remove.
+ */
+ public IgniteInternalFuture<?> removeAllDrAsync(
+ GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheVersion> drMap
+ ) {
+ return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to remove.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param filter Filter.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for asynchronous remove.
+ */
+ public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<? extends K> keys,
+ boolean retval,
+ CacheEntryPredicate filter,
+ boolean singleRmv
+ ) {
+ return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv);
+ }
+
+ /**
+ * Internal method for single update operation.
+ *
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @return Operation future.
+ */
+ private <K, V> IgniteInternalFuture putAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ @Nullable V val,
+ @Nullable EntryProcessor<K, V, Object> entryProcessor,
+ @Nullable final Object[] invokeArgs,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate filter
+ ) {
+ assert key != null;
+
+ try {
+ beforePut(cacheCtx, retval);
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+ boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ opCtx != null ? opCtx.expiry() : null,
+ entryProcessor,
+ invokeArgs,
+ retval,
+ /*lockOnly*/false,
+ filters,
+ ret,
+ opCtx != null && opCtx.skipStore(),
+ /*singleRmv*/false,
+ keepBinary,
++ opCtx != null && opCtx.recovery(),
+ dataCenterId);
+
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null)
+ loadFut.get();
+
+ final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey);
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for put on key: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ /*read*/entryProcessor != null, // Needed to force load from store.
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override public GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/false,
+ retval,
+ /*read*/false,
+ -1L,
+ filters,
+ /*computeInvoke*/true);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ }
+ else
+ return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
+ * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
+ * maps must be non-null.
+ *
+ * @param cacheCtx Context.
+ * @param map Key-value map to store.
+ * @param invokeMap Invoke map.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param drMap DR map.
+ * @param retval Key-transform value map to store.
+ * @return Operation future.
+ */
+ @SuppressWarnings("unchecked")
+ private <K, V> IgniteInternalFuture putAllAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable Map<? extends K, ? extends V> map,
+ @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
+ @Nullable final Object[] invokeArgs,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
+ final boolean retval
+ ) {
+ try {
+ beforePut(cacheCtx, retval);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId;
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+ assert map != null || invokeMap != null;
+
+ dataCenterId = opCtx.dataCenterId();
+ }
+ else
+ dataCenterId = null;
+
+ // Cached entry may be passed only from entry wrapper.
+ final Map<?, ?> map0 = map;
+ final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+
+ if (log.isDebugEnabled())
+ log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
+
+ assert map0 != null || invokeMap0 != null;
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
+ if (implicit())
+ try {
+ commit();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return new GridFinishedFuture<>(ret.success(true));
+ }
+
+ try {
+ Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
+
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
+
+ final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ keySet,
+ opCtx != null ? opCtx.expiry() : null,
+ map0,
+ invokeMap0,
+ invokeArgs,
+ retval,
+ false,
+ CU.filterArray(null),
+ ret,
+ enlisted,
+ drMap,
+ null,
+ opCtx != null && opCtx.skipStore(),
+ false,
+ keepBinary,
++ opCtx != null && opCtx.recovery(),
+ dataCenterId);
+
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null) {
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ /*read*/invokeMap != null, // Needed to force load from store.
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override public GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/false,
+ retval,
+ /*read*/false,
+ -1L,
+ CU.filterArray(null),
+ /*computeInvoke*/true);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ }
+ else
+ return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key to enlist.
+ * @param val Value.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param entryProcessor Entry processor (for invoke operation).
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+ * @param filter User filters.
+ * @param ret Return value.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for entry values loading.
+ */
+ private <K, V> IgniteInternalFuture<Void> enlistWrite(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ KeyCacheObject cacheKey,
+ Object val,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable EntryProcessor<K, V, Object> entryProcessor,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ boolean skipStore,
+ final boolean singleRmv,
+ boolean keepBinary,
++ boolean recovery,
+ Byte dataCenterId) {
+ try {
- addActiveCache(cacheCtx);
++ addActiveCache(cacheCtx, recovery);
+
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needVal = singleRmv || retval || hasFilters;
+ final boolean needReadVer = needVal && (serializable() && optimistic());
+
+ if (entryProcessor != null)
+ transform = true;
+
+ GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
+
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ /*drVer*/drVer,
+ /*drTtl*/-1L,
+ /*drExpireTime*/-1L,
+ ret,
+ /*enlisted*/null,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer,
- keepBinary);
++ keepBinary,
++ recovery);
+
+ if (loadMissed) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return loadMissing(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ Collections.singleton(cacheKey),
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+ retval,
+ keepBinary,
++ recovery,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * Internal routine for <tt>putAll(..)</tt>
+ *
+ * @param cacheCtx Cache context.
+ * @param keys Keys to enlist.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param lookup Value lookup map ({@code null} for remove).
+ * @param invokeMap Map with entry processors for invoke operation.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+ * @param filter User filters.
+ * @param ret Return value.
+ * @param enlisted Collection of keys enlisted into this transaction.
+ * @param drPutMap DR put map (optional).
+ * @param drRmvMap DR remove map (optional).
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @param keepBinary Keep binary flag.
+ * @param dataCenterId Optional data center ID.
+ * @return Future for missing values loading.
+ */
+ private <K, V> IgniteInternalFuture<Void> enlistWrite(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<?> keys,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable Map<?, ?> lookup,
+ @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ Collection<KeyCacheObject> enlisted,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
+ boolean skipStore,
+ final boolean singleRmv,
+ final boolean keepBinary,
++ final boolean recovery,
+ Byte dataCenterId
+ ) {
+ assert retval || invokeMap == null;
+
+ try {
- addActiveCache(cacheCtx);
++ addActiveCache(cacheCtx, recovery);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ boolean rmv = lookup == null && invokeMap == null;
+
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needVal = singleRmv || retval || hasFilters;
+ final boolean needReadVer = needVal && (serializable() && optimistic());
+
+ try {
+ // Set transform flag for transaction.
+ if (invokeMap != null)
+ transform = true;
+
+ Set<KeyCacheObject> missedForLoad = null;
+
+ for (Object key : keys) {
+ if (key == null) {
+ rollback();
+
+ throw new NullPointerException("Null key.");
+ }
+
+ Object val = rmv || lookup == null ? null : lookup.get(key);
+ EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
+
+ GridCacheVersion drVer;
+ long drTtl;
+ long drExpireTime;
+
+ if (drPutMap != null) {
+ GridCacheDrInfo info = drPutMap.get(key);
+
+ assert info != null;
+
+ drVer = info.version();
+ drTtl = info.ttl();
+ drExpireTime = info.expireTime();
+ }
+ else if (drRmvMap != null) {
+ assert drRmvMap.get(key) != null;
+
+ drVer = drRmvMap.get(key);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else if (dataCenterId != null) {
+ drVer = cctx.versions().next(dataCenterId);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else {
+ drVer = null;
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+
+ if (!rmv && val == null && entryProcessor == null) {
+ setRollbackOnly();
+
+ throw new NullPointerException("Null value.");
+ }
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ drVer,
+ drTtl,
+ drExpireTime,
+ ret,
+ enlisted,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer,
- keepBinary);
++ keepBinary,
++ recovery);
+
+ if (loadMissed) {
+ if (missedForLoad == null)
+ missedForLoad = new HashSet<>();
+
+ missedForLoad.add(cacheKey);
+ }
+ }
+
+ if (missedForLoad != null) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return loadMissing(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ missedForLoad,
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+ retval,
+ keepBinary,
++ recovery,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param retval Return value flag.
+ * @param lockOnly Lock only flag.
+ * @param filter Filter.
+ * @param drVer DR version.
+ * @param drTtl DR ttl.
+ * @param drExpireTime DR expire time.
+ * @param ret Return value.
+ * @param enlisted Enlisted keys collection.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param needVal {@code True} if value is needed.
+ * @param needReadVer {@code True} if need read entry version.
+ * @return {@code True} if entry value should be loaded.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ final KeyCacheObject cacheKey,
+ @Nullable final Object val,
+ @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
+ @Nullable final Object[] invokeArgs,
+ @Nullable final ExpiryPolicy expiryPlc,
+ final boolean retval,
+ final boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheVersion drVer,
+ final long drTtl,
+ long drExpireTime,
+ final GridCacheReturn ret,
+ @Nullable final Collection<KeyCacheObject> enlisted,
+ boolean skipStore,
+ boolean singleRmv,
+ boolean hasFilters,
+ final boolean needVal,
+ boolean needReadVer,
- boolean keepBinary
++ boolean keepBinary,
++ boolean recovery
+ ) throws IgniteCheckedException {
+ boolean loadMissed = false;
+
+ final boolean rmv = val == null && entryProcessor == null;
+
+ IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+
+ IgniteTxEntry txEntry = entry(txKey);
+
+ // First time access.
+ if (txEntry == null) {
+ while (true) {
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
+
+ try {
+ entry.unswap(false);
+
+ // Check if lock is being explicitly acquired by the same thread.
+ if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+ entry.lockedByThread(threadId, xidVer)) {
+ throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+ "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
+ ", entry=" + entry +
+ ", xidVer=" + xidVer +
+ ", threadId=" + threadId +
+ ", locNodeId=" + cctx.localNodeId() + ']');
+ }
+
+ CacheObject old = null;
+ GridCacheVersion readVer = null;
+
+ if (optimistic() && !implicit()) {
+ try {
+ if (needReadVer) {
+ EntryGetResult res = primaryLocal(entry) ?
+ entry.innerGetVersioned(
+ null,
+ this,
- /*swap*/false,
- /*unmarshal*/retval || needVal,
+ /*metrics*/retval,
+ /*events*/retval,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null,
+ keepBinary,
+ null) : null;
+
+ if (res != null) {
+ old = res.value();
+ readVer = res.version();
+ }
+ }
+ else {
+ old = entry.innerGet(
+ null,
+ this,
- /*swap*/false,
- /*read-through*/false,
+ /*metrics*/retval,
+ /*events*/retval,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null,
+ keepBinary);
+ }
+ }
+ catch (ClusterTopologyCheckedException e) {
+ entry.context().evicts().touch(entry, topologyVersion());
+
+ throw e;
+ }
+ }
+ else
- old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
++ old = entry.rawGet();
+
+ final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+ entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+ if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
+ ret.set(cacheCtx, old, false, keepBinary);
+
+ if (!readCommitted()) {
+ if (optimistic() && serializable()) {
+ txEntry = addEntry(op,
+ old,
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+ }
+ else {
+ txEntry = addEntry(READ,
+ old,
+ null,
+ null,
+ entry,
+ null,
+ CU.empty0(),
+ false,
+ -1L,
+ -1L,
+ null,
+ skipStore,
+ keepBinary);
+ }
+
+ txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
+ }
+
+ if (readCommitted())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ break; // While.
+ }
+
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+
- if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
++ if (!implicit() && readCommitted())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ if (enlisted != null)
+ enlisted.add(cacheKey);
+
+ if (!pessimistic() && !implicit()) {
+ txEntry.markValid();
+
+ if (old == null) {
+ if (needVal)
+ loadMissed = true;
+ else {
+ assert !implicit() || !transform : this;
+ assert txEntry.op() != TRANSFORM : txEntry;
+
+ if (retval)
+ ret.set(cacheCtx, null, true, keepBinary);
+ else
+ ret.success(true);
+ }
+ }
+ else {
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
+
+ if (retval && !transform)
+ ret.set(cacheCtx, old, true, keepBinary);
+ else {
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : txEntry;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version " +
+ "[err=" + ex.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(txEntry, old, ret, ver);
+ }
+ else
+ ret.success(true);
+ }
+ }
+ }
+ // Pessimistic.
+ else {
+ if (retval && !transform)
+ ret.set(cacheCtx, old, true, keepBinary);
+ else
+ ret.success(true);
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction putAll0 method: " + entry);
+ }
+ }
+ }
+ else {
+ if (entryProcessor == null && txEntry.op() == TRANSFORM)
+ throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+ "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
+
+ GridCacheEntryEx entry = txEntry.cached();
+
+ CacheObject v = txEntry.value();
+
+ boolean del = txEntry.op() == DELETE && rmv;
+
+ if (!del) {
+ if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
+ ret.set(cacheCtx, v, false, keepBinary);
+
+ return loadMissed;
+ }
+
+ GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+ v != null ? UPDATE : CREATE;
+
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+
+ if (enlisted != null)
+ enlisted.add(cacheKey);
+
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert optimistic() : txEntry;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(txEntry, txEntry.value(), ret, ver);
+ }
+ }
+
+ if (!pessimistic()) {
+ txEntry.markValid();
+
+ if (retval && !transform)
+ ret.set(cacheCtx, v, true, keepBinary);
+ else
+ ret.success(true);
+ }
+ }
+
+ return loadMissed;
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to remove.
+ * @param drMap DR map.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param filter Filter.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for asynchronous remove.
+ */
+ @SuppressWarnings("unchecked")
+ private <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable final Collection<? extends K> keys,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate filter,
+ boolean singleRmv) {
+ try {
+ checkUpdatesAllowed(cacheCtx);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
+
+ if (retval)
+ needReturnValue(true);
+
+ final Collection<?> keys0;
+
+ if (drMap != null) {
+ assert keys == null;
+
+ keys0 = drMap.keySet();
+ }
+ else
+ keys0 = keys;
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId;
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+
+ dataCenterId = opCtx.dataCenterId();
+ }
+ else
+ dataCenterId = null;
+
+ assert keys0 != null;
+
+ if (log.isDebugEnabled())
+ log.debug(S.toString("Called removeAllAsync(...)",
+ "tx", this, false,
+ "keys", keys0, true,
+ "implicit", implicit, false,
+ "retval", retval, false));
+
+ try {
+ checkValid();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ if (F.isEmpty(keys0)) {
+ if (implicit()) {
+ try {
+ commit();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ return new GridFinishedFuture<>(ret.success(true));
+ }
+
+ init();
+
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>();
+
+ ExpiryPolicy plc;
+
+ final CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+ if (!F.isEmpty(filters))
+ plc = opCtx != null ? opCtx.expiry() : null;
+ else
+ plc = null;
+
+ final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ keys0,
+ plc,
+ /*lookup map*/null,
+ /*invoke map*/null,
+ /*invoke arguments*/null,
+ retval,
+ /*lock only*/false,
+ filters,
+ ret,
+ enlisted,
+ null,
+ drMap,
+ opCtx != null && opCtx.skipStore(),
+ singleRmv,
+ keepBinary,
++ opCtx != null && opCtx.recovery(),
+ dataCenterId
+ );
+
+ if (log.isDebugEnabled())
+ log.debug("Remove keys: " + enlisted);
+
+ // Acquire locks only after having added operation to the write set.
+ // Otherwise, during rollback we will not know whether locks need
+ // to be rolled back.
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null) {
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ false,
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override protected GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for remove on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/true,
+ retval,
+ /*read*/false,
+ -1L,
+ filters,
+ /*computeInvoke*/false);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ else {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
+
+ return nonInterruptable(commitNearTxLocalAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ throws IgniteCheckedException {
+ try {
+ txFut.get();
+
+ return new GridCacheReturn(cacheCtx, true, keepBinary,
+ implicitRes.value(), implicitRes.success());
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ rollbackNearTxLocalAsync();
+
+ throw e;
+ }
+ }
+ }));
+ }
+ else {
+ return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f)
+ throws IgniteCheckedException {
+ f.get();
+
+ return ret;
+ }
+ }));
+ }
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to get.
+ * @param deserializeBinary Deserialize binary flag.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects
+ * @param skipStore Skip store flag.
+ * @return Future for this get.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
+ final GridCacheContext cacheCtx,
+ @Nullable final AffinityTopologyVersion entryTopVer,
+ Collection<KeyCacheObject> keys,
+ final boolean deserializeBinary,
+ final boolean skipVals,
+ final boolean keepCacheObjects,
+ final boolean skipStore,
++ final boolean recovery,
+ final boolean needVer) {
+ if (F.isEmpty(keys))
+ return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+
+ init();
+
+ int keysCnt = keys.size();
+
+ boolean single = keysCnt == 1;
+
+ try {
+ checkValid();
+
+ final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
+
+ final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
+
+ final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
+ entryTopVer,
+ keys,
+ expiryPlc,
+ retMap,
+ missed,
+ keysCnt,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
++ recovery,
+ needVer);
+
+ if (single && missed.isEmpty())
+ return new GridFinishedFuture<>(retMap);
+
+ // Handle locks.
+ if (pessimistic() && !readCommitted() && !skipVals) {
+ if (expiryPlc == null)
+ expiryPlc = cacheCtx.expiry();
+
+ long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
+ long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED;
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
+ timeout,
+ this,
+ true,
+ true,
+ isolation,
+ isInvalidate(),
+ createTtl,
+ accessTtl);
+
+ final ExpiryPolicy expiryPlc0 = expiryPlc;
+
+ PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
+ @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for read on keys: " + lockKeys);
+
+ // Load keys only after the locks have been acquired.
+ for (KeyCacheObject cacheKey : lockKeys) {
+ K keyVal = (K)
+ (keepCacheObjects ? cacheKey :
+ cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
+
+ if (retMap.containsKey(keyVal))
+ // We already have a return value.
+ continue;
+
+ IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+
+ IgniteTxEntry txEntry = entry(txKey);
+
+ assert txEntry != null;
+
+ // Check if there is cached value.
+ while (true) {
+ GridCacheEntryEx cached = txEntry.cached();
+
+ CacheObject val = null;
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ try {
+ Object transformClo =
+ (!F.isEmpty(txEntry.entryProcessors()) &&
+ cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ F.first(txEntry.entryProcessors()) : null;
+
+ if (needVer) {
+ getRes = cached.innerGetVersioned(
+ null,
+ GridNearTxLocal.this,
- /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
- /*unmarshal*/true,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ CU.subjectId(GridNearTxLocal.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary(),
+ null);
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else{
+ val = cached.innerGet(
+ null,
+ GridNearTxLocal.this,
- cacheCtx.isSwapOrOffheapEnabled(),
- /*read-through*/false,
+ /*metrics*/true,
+ /*events*/!skipVals,
+ /*temporary*/false,
+ CU.subjectId(GridNearTxLocal.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+ }
+
+ // If value is in cache and passed the filter.
+ if (val != null) {
+ missed.remove(cacheKey);
+
+ txEntry.setAndMarkValid(val);
+
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ cacheCtx.addResult(retMap,
+ cacheKey,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+
+ if (readVer != null)
+ txEntry.entryReadVersion(readVer);
+ }
+
+ // Even though we bring the value back from lock acquisition,
+ // we still need to recheck primary node for consistent values
+ // in case of concurrent transactional locks.
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed exception in get postLock (will retry): " +
+ cached);
+
+ txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
+ }
+ }
+ }
+
+ if (!missed.isEmpty() && cacheCtx.isLocal()) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return checkMissed(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ retMap,
+ missed,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
++ recovery,
+ needVer,
+ expiryPlc0);
+ }
+
+ return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+ }
+ };
+
+ FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
+ @Override Map<K, V> finish(Map<K, V> loaded) {
+ retMap.putAll(loaded);
+
+ return retMap;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
+
+ return fut1.isDone() ?
+ new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
+ new GridEmbeddedFuture<>(finClos, fut1);
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return plc2.apply(false, e);
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return new GridEmbeddedFuture<>(
+ fut,
+ plc2,
+ finClos);
+ }
+ }
+ else {
+ assert optimistic() || readCommitted() || skipVals;
+
+ if (!missed.isEmpty()) {
+ if (!readCommitted())
+ for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
+ KeyCacheObject cacheKey = it.next();
+
+ K keyVal =
+ (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false));
+
+ if (retMap.containsKey(keyVal))
+ it.remove();
+ }
+
+ if (missed.isEmpty())
+ return new GridFinishedFuture<>(retMap);
+
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return checkMissed(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ retMap,
+ missed,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
++ recovery,
+ needVer,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>(retMap);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ setRollbackOnly();
+
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Key to enlist.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param map Return map.
+ * @param missed Map of missed keys.
+ * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
+ * @param deserializeBinary Deserialize binary flag.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects flag.
+ * @param skipStore Skip store flag.
+ * @throws IgniteCheckedException If failed.
+ * @return Enlisted keys.
+ */
+ @SuppressWarnings({"RedundantTypeArguments"})
+ private <K, V> Collection<KeyCacheObject> enlistRead(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<KeyCacheObject> keys,
+ @Nullable ExpiryPolicy expiryPlc,
+ Map<K, V> map,
+ Map<KeyCacheObject, GridCacheVersion> missed,
+ int keysCnt,
+ boolean deserializeBinary,
+ boolean skipVals,
+ boolean keepCacheObjects,
+ boolean skipStore,
++ boolean recovery,
+ final boolean needVer
+ ) throws IgniteCheckedException {
+ assert !F.isEmpty(keys);
+ assert keysCnt == keys.size();
+
+ cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ boolean single = keysCnt == 1;
+
+ Collection<KeyCacheObject> lockKeys = null;
+
+ AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
+
+ boolean needReadVer = (serializable() && optimistic()) || needVer;
+
+ // In this loop we cover only read-committed or optimistic transactions.
+ // Transactions that are pessimistic and not read-committed are covered
+ // outside of this loop.
+ for (KeyCacheObject key : keys) {
+ if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
- addActiveCache(cacheCtx);
++ addActiveCache(cacheCtx, recovery);
+
+ IgniteTxKey txKey = cacheCtx.txKey(key);
+
+ // Check write map (always check writes first).
+ IgniteTxEntry txEntry = entry(txKey);
+
+ // Either non-read-committed or there was a previous write.
+ if (txEntry != null) {
+ CacheObject val = txEntry.value();
+
+ if (txEntry.hasValue()) {
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ if (val != null) {
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ if (txEntry.op() != READ)
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+ else {
+ ver = txEntry.entryReadVersion();
+
+ if (ver == null && pessimistic()) {
+ while (true) {
+ try {
+ GridCacheEntryEx cached = txEntry.cached();
+
+ ver = cached.isNear() ?
+ ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ }
+ }
+ }
+
+ if (ver == null) {
+ assert optimistic() && repeatableRead() : this;
+
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+ }
+ }
+
+ assert ver != null;
+ }
+
+ cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
+ ver, 0, 0);
+ }
+ }
+ else {
+ assert txEntry.op() == TRANSFORM;
+
+ while (true) {
+ try {
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ Object transformClo =
+ (txEntry.op() == TRANSFORM &&
+ cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ F.first(txEntry.entryProcessors()) : null;
+
+ if (needVer) {
+ getRes = txEntry.cached().innerGetVersioned(
+ null,
+ this,
- /*swap*/true,
- /*unmarshal*/true,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary(),
+ null);
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else {
+ val = txEntry.cached().innerGet(
+ null,
+ this,
- /*swap*/true,
- /*read-through*/false,
+ /*metrics*/true,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+ }
+
+ if (val != null) {
+ if (!readCommitted() && !skipVals)
+ txEntry.readValue(val);
+
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+ }
+ else
+ missed.put(key, txEntry.cached().version());
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ }
+ }
+ }
+ }
+ // First time access within transaction.
+ else {
+ if (lockKeys == null && !skipVals)
+ lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
+
+ if (!single && !skipVals)
+ lockKeys.add(key);
+
+ while (true) {
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+
+ try {
+ GridCacheVersion ver = entry.version();
+
+ CacheObject val = null;
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ if (!pessimistic() || readCommitted() && !skipVals) {
+ IgniteCacheExpiryPolicy accessPlc =
+ optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
+
+ if (needReadVer) {
+ getRes = primaryLocal(entry) ?
+ entry.innerGetVersioned(
+ null,
+ this,
- /*swap*/true,
- /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc,
+ !deserializeBinary,
+ null) : null;
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else {
+ val = entry.innerGet(
+ null,
+ this,
- /*swap*/true,
- /*read-through*/false,
+ /*metrics*/true,
+ /*event*/true,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc,
+ !deserializeBinary);
+ }
+
+ if (val != null) {
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+ }
+ else
+ missed.put(key, ver);
+ }
+ else
+ // We must wait for the lock in pessimistic mode.
+ missed.put(key, ver);
+
+ if (!readCommitted() && !skipVals) {
+ txEntry = addEntry(READ,
+ val,
+ null,
+ null,
+ entry,
+ expiryPlc,
+ null,
+ true,
+ -1L,
+ -1L,
+ null,
+ skipStore,
+ !deserializeBinary);
+
+ // As optimization, mark as checked immediately
+ // for non-pessimistic if value is not null.
+ if (val != null && !pessimistic()) {
+ txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(readVer);
+ }
+ }
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+ }
+ finally {
+ if (entry != null && readCommitted()) {
+ if (cacheCtx.isNear()) {
+ if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
+ if (entry.markObsolete(xidVer))
+ cacheCtx.cache().removeEntry(entry);
+ }
+ }
+ else
+ entry.context().evicts().touch(entry, topVer);
+ }
+ }
+ }
+ }
+ }
+
+ return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to load.
+ * @param filter Filter.
+ * @param ret Return value.
+ * @param needReadVer Read version flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param readThrough Read through flag.
+ * @param retval Return value flag.
+ * @param expiryPlc Expiry policy.
+ * @return Load future.
+ */
+ private IgniteInternalFuture<Void> loadMissing(
+ final GridCacheContext cacheCtx,
+ final AffinityTopologyVersion topVer,
+ final Set<KeyCacheObject> keys,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ final boolean needReadVer,
+ final boolean singleRmv,
+ final boolean hasFilters,
+ final boolean readThrough,
+ final boolean retval,
+ final boolean keepBinary,
++ final boolean recovery,
+ final ExpiryPolicy expiryPlc) {
+ GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
+ new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+ @Override public void apply(KeyCacheObject key,
+ @Nullable Object val,
+ @Nullable GridCacheVersion loadVer) {
+ if (log.isDebugEnabled())
+ log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+
+ IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
+
+ assert e != null;
+
+ if (needReadVer) {
+ assert loadVer != null;
+
+ e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+ }
+
+ if (singleRmv) {
+ assert !hasFilters && !retval;
+ assert val == null || Boolean.TRUE.equals(val) : val;
+
+ ret.set(cacheCtx, null, val != null, keepBinary);
+ }
+ else {
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+ if (e.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ e.readValue(cacheVal);
+
+ try {
+ ver = e.cached().version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : e;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(e, cacheVal, ret, ver);
+ }
+ else {
+ boolean success;
+
+ if (hasFilters) {
+ success = isAll(e.context(), key, cacheVal, filter);
+
+ if (!success)
+ e.value(cacheVal, false, false);
+ }
+ else
+ success = true;
+
+ ret.set(cacheCtx, cacheVal, success, keepBinary);
+ }
+ }
+ }
+ };
+
+ return loadMissing(
+ cacheCtx,
+ topVer,
+ readThrough,
+ /*async*/true,
+ keys,
+ /*skipVals*/singleRmv,
+ needReadVer,
+ keepBinary,
++ recovery,
+ expiryPlc,
+ c);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param loadFut Missing keys load future.
+ * @param ret Future result.
+ * @param keepBinary Keep binary flag.
+ * @return Future.
*/
- public boolean colocatedLocallyMapped() {
- return colocatedLocallyMapped;
+ private IgniteInternalFuture optimisticPutFuture(
+ final GridCacheContext cacheCtx,
+ IgniteInternalFuture<Void> loadFut,
+ final GridCacheReturn ret,
+ final boolean keepBinary
+ ) {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
+
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return nonInterruptable(commitNearTxLocalAsync().chain(
+ new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ throws IgniteCheckedException {
+ try {
+ txFut.get();
+
+ Object res = implicitRes.value();
+
+ if (implicitRes.invokeResult()) {
+ assert res == null || res instanceof Map : implicitRes;
+
+ res = cacheCtx.unwrapInvokeResult((Map)res, keepBinary);
+ }
+
+ return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ if (!(e instanceof NodeStoppingException))
+ rollbackNearTxLocalAsync();
+
+ throw e;
+ }
+ }
+ }
+ ));
+ }
+ else {
+ return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) throws IgniteCheckedException {
+ f.get();
+
+ return ret;
+ }
+ }));
+ }
}
/**
@@@ -449,6 -2571,152 +2577,151 @@@
}
/**
+ * @param cacheCtx Cache context.
+ * @param readThrough Read through flag.
+ * @param async if {@code True}, then loading will happen in a separate thread.
+ * @param keys Keys.
+ * @param skipVals Skip values flag.
+ * @param needVer If {@code true} version is required for loaded values.
+ * @param c Closure to be applied for loaded values.
+ * @param expiryPlc Expiry policy.
+ * @return Future with {@code True} value if loading took place.
+ */
+ private IgniteInternalFuture<Void> localCacheLoadMissing(
+ final GridCacheContext cacheCtx,
+ final AffinityTopologyVersion topVer,
+ final boolean readThrough,
+ boolean async,
+ final Collection<KeyCacheObject> keys,
+ boolean skipVals,
+ boolean needVer,
+ boolean keepBinary,
++ boolean recovery,
+ final ExpiryPolicy expiryPlc,
+ final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
+ ) {
+ assert cacheCtx.isLocal() : cacheCtx.name();
+
+ if (!readThrough || !cacheCtx.readThrough()) {
+ for (KeyCacheObject key : keys)
+ c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
+
+ return new GridFinishedFuture<>();
+ }
+
+ try {
+ IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ?
+ accessPolicy(cacheCtx, keys) :
+ cacheCtx.cache().expiryPolicy(expiryPlc);
+
+ Map<KeyCacheObject, GridCacheVersion> misses = null;
+
+ for (KeyCacheObject key : keys) {
+ while (true) {
+ IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
+
+ GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) :
+ txEntry.cached();
+
+ if (entry == null)
+ continue;
+
+ try {
+ EntryGetResult res = entry.innerGetVersioned(
+ null,
+ this,
- /*readSwap*/true,
- /*unmarshal*/true,
+ /*update-metrics*/!skipVals,
+ /*event*/!skipVals,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ expiryPlc0,
+ txEntry == null ? keepBinary : txEntry.keepBinary(),
+ null);
+
+ if (res == null) {
+ if (misses == null)
+ misses = new LinkedHashMap<>();
+
+ misses.put(key, entry.version());
+ }
+ else
+ c.apply(key, skipVals ? true : res.value(), res.version());
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry, will retry: " + key);
+
+ if (txEntry != null)
+ txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion()));
+ }
+ }
+ }
+
+ if (misses != null) {
+ final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
+
+ cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() {
+ @Override public void apply(KeyCacheObject key, Object val) {
+ GridCacheVersion ver = misses0.remove(key);
+
+ assert ver != null : key;
+
+ if (val != null) {
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+ while (true) {
+ GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
+
+ try {
+ EntryGetResult verVal = entry.versionedValue(cacheVal,
+ ver,
+ null,
+ null,
+ null
<TRUNCATED>
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------