You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/03/23 07:15:40 UTC
[05/51] [abbrv] ignite git commit: Internal cache API cleanup.
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8ed749c..81606d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -18,24 +18,37 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Externalizable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.cache.Cache;
+import javax.cache.CacheException;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -46,31 +59,52 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -83,7 +117,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
* Replicated user transaction.
*/
@SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoCloseable {
/** */
private static final long serialVersionUID = 0L;
@@ -135,6 +169,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** */
private boolean hasRemoteLocks;
+ /** If this transaction contains transform entries. */
+ protected boolean transform;
+
+ /** */
+ @GridToStringExclude
+ private TransactionProxyImpl proxy;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -244,14 +285,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/**
* Marks transaction to check if commit on backup.
*/
- public void markForBackupCheck() {
+ void markForBackupCheck() {
needCheckBackup = true;
}
/**
* @return If need to check tx commit on backup.
*/
- public boolean onNeedCheckBackup() {
+ boolean onNeedCheckBackup() {
Boolean check = needCheckBackup;
if (check != null && check) {
@@ -260,52 +301,2127 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return true;
}
- return false;
- }
+ return false;
+ }
+
+ /**
+ * @return If backup check was requested.
+ */
+ boolean needCheckBackup() {
+ return needCheckBackup != null;
+ }
+
+ /**
+ * @return {@code True} if transaction contains at least one near cache key mapped to the local node.
+ */
+ public boolean nearLocallyMapped() {
+ return nearLocallyMapped;
+ }
+
+ /**
+ * @param nearLocallyMapped {@code True} if transaction contains near key mapped to the local node.
+ */
+ void nearLocallyMapped(boolean nearLocallyMapped) {
+ this.nearLocallyMapped = nearLocallyMapped;
+ }
+
+ /**
+ * @return {@code True} if transaction contains colocated key mapped to the local node.
+ */
+ public boolean colocatedLocallyMapped() {
+ return colocatedLocallyMapped;
+ }
+
+ /**
+ * @param colocatedLocallyMapped {@code True} if transaction contains colocated key mapped to the local node.
+ */
+ public void colocatedLocallyMapped(boolean colocatedLocallyMapped) {
+ this.colocatedLocallyMapped = colocatedLocallyMapped;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) {
+ return entry.detached() || super.ownsLockUnsafe(entry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException {
+ return entry.detached() || super.ownsLock(entry);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param map Map to put.
+ * @param retval Flag indicating whether a value should be returned.
+ * @return Future for put operation.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Map<? extends K, ? extends V> map,
+ boolean retval
+ ) {
+ return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+ entryTopVer,
+ map,
+ null,
+ null,
+ null,
+ retval);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @return Future for put operation.
+ */
+ public final <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ V val,
+ boolean retval,
+ CacheEntryPredicate filter) {
+ return putAsync0(cacheCtx,
+ entryTopVer,
+ key,
+ val,
+ null,
+ null,
+ retval,
+ filter);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @return Operation future.
+ */
+ public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ EntryProcessor<K, V, Object> entryProcessor,
+ Object... invokeArgs) {
+ return (IgniteInternalFuture)putAsync0(cacheCtx,
+ entryTopVer,
+ key,
+ null,
+ entryProcessor,
+ invokeArgs,
+ true,
+ null);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param map Entry processors map.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @return Operation future.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
+ Object... invokeArgs
+ ) {
+ return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+ entryTopVer,
+ null,
+ map,
+ invokeArgs,
+ null,
+ true);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param drMap DR map to put.
+ * @return Future for DR put operation.
+ */
+ public IgniteInternalFuture<?> putAllDrAsync(
+ GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheDrInfo> drMap
+ ) {
+ Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+ @Override public Object apply(GridCacheDrInfo val) {
+ return val.value();
+ }
+ });
+
+ return this.<Object, Object>putAllAsync0(cacheCtx,
+ null,
+ map,
+ null,
+ null,
+ drMap,
+ false);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param drMap DR map.
+ * @return Future for asynchronous remove.
+ */
+ public IgniteInternalFuture<?> removeAllDrAsync(
+ GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheVersion> drMap
+ ) {
+ return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to remove.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param filter Filter.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for asynchronous remove.
+ */
+ public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<? extends K> keys,
+ boolean retval,
+ CacheEntryPredicate filter,
+ boolean singleRmv
+ ) {
+ return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv);
+ }
+
+ /**
+ * Internal method for single update operation.
+ *
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @return Operation future.
+ */
+ private <K, V> IgniteInternalFuture putAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ @Nullable V val,
+ @Nullable EntryProcessor<K, V, Object> entryProcessor,
+ @Nullable final Object[] invokeArgs,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate filter
+ ) {
+ assert key != null;
+
+ try {
+ beforePut(cacheCtx, retval);
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+ boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ opCtx != null ? opCtx.expiry() : null,
+ entryProcessor,
+ invokeArgs,
+ retval,
+ /*lockOnly*/false,
+ filters,
+ ret,
+ opCtx != null && opCtx.skipStore(),
+ /*singleRmv*/false,
+ keepBinary,
+ dataCenterId);
+
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null)
+ loadFut.get();
+
+ final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey);
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for put on key: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ /*read*/entryProcessor != null, // Needed to force load from store.
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override public GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/false,
+ retval,
+ /*read*/false,
+ -1L,
+ filters,
+ /*computeInvoke*/true);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ }
+ else
+ return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
+ * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
+ * maps must be non-null.
+ *
+ * @param cacheCtx Context.
+ * @param map Key-value map to store.
+ * @param invokeMap Invoke map.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param drMap DR map.
+ * @param retval Key-transform value map to store.
+ * @return Operation future.
+ */
+ @SuppressWarnings("unchecked")
+ private <K, V> IgniteInternalFuture putAllAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable Map<? extends K, ? extends V> map,
+ @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
+ @Nullable final Object[] invokeArgs,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
+ final boolean retval
+ ) {
+ try {
+ beforePut(cacheCtx, retval);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId;
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+ assert map != null || invokeMap != null;
+
+ dataCenterId = opCtx.dataCenterId();
+ }
+ else
+ dataCenterId = null;
+
+ // Cached entry may be passed only from entry wrapper.
+ final Map<?, ?> map0 = map;
+ final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+
+ if (log.isDebugEnabled())
+ log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
+
+ assert map0 != null || invokeMap0 != null;
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
+ if (implicit())
+ try {
+ commit();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return new GridFinishedFuture<>(ret.success(true));
+ }
+
+ try {
+ Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
+
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
+
+ final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ keySet,
+ opCtx != null ? opCtx.expiry() : null,
+ map0,
+ invokeMap0,
+ invokeArgs,
+ retval,
+ false,
+ CU.filterArray(null),
+ ret,
+ enlisted,
+ drMap,
+ null,
+ opCtx != null && opCtx.skipStore(),
+ false,
+ keepBinary,
+ dataCenterId);
+
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null) {
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ /*read*/invokeMap != null, // Needed to force load from store.
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override public GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/false,
+ retval,
+ /*read*/false,
+ -1L,
+ CU.filterArray(null),
+ /*computeInvoke*/true);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ }
+ else
+ return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key to enlist.
+ * @param val Value.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param entryProcessor Entry processor (for invoke operation).
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+ * @param filter User filters.
+ * @param ret Return value.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for entry values loading.
+ */
+ private <K, V> IgniteInternalFuture<Void> enlistWrite(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ KeyCacheObject cacheKey,
+ Object val,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable EntryProcessor<K, V, Object> entryProcessor,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ boolean skipStore,
+ final boolean singleRmv,
+ boolean keepBinary,
+ Byte dataCenterId) {
+ try {
+ addActiveCache(cacheCtx);
+
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needVal = singleRmv || retval || hasFilters;
+ final boolean needReadVer = needVal && (serializable() && optimistic());
+
+ if (entryProcessor != null)
+ transform = true;
+
+ GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
+
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ /*drVer*/drVer,
+ /*drTtl*/-1L,
+ /*drExpireTime*/-1L,
+ ret,
+ /*enlisted*/null,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer,
+ keepBinary);
+
+ if (loadMissed) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return loadMissing(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ Collections.singleton(cacheKey),
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+ retval,
+ keepBinary,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * Internal routine for <tt>putAll(..)</tt>
+ *
+ * @param cacheCtx Cache context.
+ * @param keys Keys to enlist.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param lookup Value lookup map ({@code null} for remove).
+ * @param invokeMap Map with entry processors for invoke operation.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+ * @param filter User filters.
+ * @param ret Return value.
+ * @param enlisted Collection of keys enlisted into this transaction.
+ * @param drPutMap DR put map (optional).
+ * @param drRmvMap DR remove map (optional).
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @param keepBinary Keep binary flag.
+ * @param dataCenterId Optional data center ID.
+ * @return Future for missing values loading.
+ */
+ private <K, V> IgniteInternalFuture<Void> enlistWrite(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<?> keys,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable Map<?, ?> lookup,
+ @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ Collection<KeyCacheObject> enlisted,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
+ boolean skipStore,
+ final boolean singleRmv,
+ final boolean keepBinary,
+ Byte dataCenterId
+ ) {
+ assert retval || invokeMap == null;
+
+ try {
+ addActiveCache(cacheCtx);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ boolean rmv = lookup == null && invokeMap == null;
+
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needVal = singleRmv || retval || hasFilters;
+ final boolean needReadVer = needVal && (serializable() && optimistic());
+
+ try {
+ // Set transform flag for transaction.
+ if (invokeMap != null)
+ transform = true;
+
+ Set<KeyCacheObject> missedForLoad = null;
+
+ for (Object key : keys) {
+ if (key == null) {
+ rollback();
+
+ throw new NullPointerException("Null key.");
+ }
+
+ Object val = rmv || lookup == null ? null : lookup.get(key);
+ EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
+
+ GridCacheVersion drVer;
+ long drTtl;
+ long drExpireTime;
+
+ if (drPutMap != null) {
+ GridCacheDrInfo info = drPutMap.get(key);
+
+ assert info != null;
+
+ drVer = info.version();
+ drTtl = info.ttl();
+ drExpireTime = info.expireTime();
+ }
+ else if (drRmvMap != null) {
+ assert drRmvMap.get(key) != null;
+
+ drVer = drRmvMap.get(key);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else if (dataCenterId != null) {
+ drVer = cctx.versions().next(dataCenterId);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else {
+ drVer = null;
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+
+ if (!rmv && val == null && entryProcessor == null) {
+ setRollbackOnly();
+
+ throw new NullPointerException("Null value.");
+ }
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ drVer,
+ drTtl,
+ drExpireTime,
+ ret,
+ enlisted,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer,
+ keepBinary);
+
+ if (loadMissed) {
+ if (missedForLoad == null)
+ missedForLoad = new HashSet<>();
+
+ missedForLoad.add(cacheKey);
+ }
+ }
+
+ if (missedForLoad != null) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return loadMissing(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ missedForLoad,
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+ retval,
+ keepBinary,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param retval Return value flag.
+ * @param lockOnly Lock only flag.
+ * @param filter Filter.
+ * @param drVer DR version.
+ * @param drTtl DR ttl.
+ * @param drExpireTime DR expire time.
+ * @param ret Return value.
+ * @param enlisted Enlisted keys collection.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param needVal {@code True} if value is needed.
+ * @param needReadVer {@code True} if need read entry version.
+ * @return {@code True} if entry value should be loaded.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ final KeyCacheObject cacheKey,
+ @Nullable final Object val,
+ @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
+ @Nullable final Object[] invokeArgs,
+ @Nullable final ExpiryPolicy expiryPlc,
+ final boolean retval,
+ final boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheVersion drVer,
+ final long drTtl,
+ long drExpireTime,
+ final GridCacheReturn ret,
+ @Nullable final Collection<KeyCacheObject> enlisted,
+ boolean skipStore,
+ boolean singleRmv,
+ boolean hasFilters,
+ final boolean needVal,
+ boolean needReadVer,
+ boolean keepBinary
+ ) throws IgniteCheckedException {
+ boolean loadMissed = false;
+
+ final boolean rmv = val == null && entryProcessor == null;
+
+ IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+
+ IgniteTxEntry txEntry = entry(txKey);
+
+ // First time access.
+ if (txEntry == null) {
+ while (true) {
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
+
+ try {
+ entry.unswap(false);
+
+ // Check if lock is being explicitly acquired by the same thread.
+ if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+ entry.lockedByThread(threadId, xidVer)) {
+ throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+ "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
+ ", entry=" + entry +
+ ", xidVer=" + xidVer +
+ ", threadId=" + threadId +
+ ", locNodeId=" + cctx.localNodeId() + ']');
+ }
+
+ CacheObject old = null;
+ GridCacheVersion readVer = null;
+
+ if (optimistic() && !implicit()) {
+ try {
+ if (needReadVer) {
+ EntryGetResult res = primaryLocal(entry) ?
+ entry.innerGetVersioned(
+ null,
+ this,
+ /*swap*/false,
+ /*unmarshal*/retval || needVal,
+ /*metrics*/retval,
+ /*events*/retval,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null,
+ keepBinary,
+ null) : null;
+
+ if (res != null) {
+ old = res.value();
+ readVer = res.version();
+ }
+ }
+ else {
+ old = entry.innerGet(
+ null,
+ this,
+ /*swap*/false,
+ /*read-through*/false,
+ /*metrics*/retval,
+ /*events*/retval,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null,
+ keepBinary);
+ }
+ }
+ catch (ClusterTopologyCheckedException e) {
+ entry.context().evicts().touch(entry, topologyVersion());
+
+ throw e;
+ }
+ }
+ else
+ old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+
+ final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+ entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+ if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
+ ret.set(cacheCtx, old, false, keepBinary);
+
+ if (!readCommitted()) {
+ if (optimistic() && serializable()) {
+ txEntry = addEntry(op,
+ old,
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+ }
+ else {
+ txEntry = addEntry(READ,
+ old,
+ null,
+ null,
+ entry,
+ null,
+ CU.empty0(),
+ false,
+ -1L,
+ -1L,
+ null,
+ skipStore,
+ keepBinary);
+ }
+
+ txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
+ }
+
+ if (readCommitted())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ break; // While.
+ }
+
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+
+ if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ if (enlisted != null)
+ enlisted.add(cacheKey);
+
+ if (!pessimistic() && !implicit()) {
+ txEntry.markValid();
+
+ if (old == null) {
+ if (needVal)
+ loadMissed = true;
+ else {
+ assert !implicit() || !transform : this;
+ assert txEntry.op() != TRANSFORM : txEntry;
+
+ if (retval)
+ ret.set(cacheCtx, null, true, keepBinary);
+ else
+ ret.success(true);
+ }
+ }
+ else {
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
+
+ if (retval && !transform)
+ ret.set(cacheCtx, old, true, keepBinary);
+ else {
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : txEntry;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version " +
+ "[err=" + ex.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(txEntry, old, ret, ver);
+ }
+ else
+ ret.success(true);
+ }
+ }
+ }
+ // Pessimistic.
+ else {
+ if (retval && !transform)
+ ret.set(cacheCtx, old, true, keepBinary);
+ else
+ ret.success(true);
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction putAll0 method: " + entry);
+ }
+ }
+ }
+ else {
+ if (entryProcessor == null && txEntry.op() == TRANSFORM)
+ throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+ "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
+
+ GridCacheEntryEx entry = txEntry.cached();
+
+ CacheObject v = txEntry.value();
+
+ boolean del = txEntry.op() == DELETE && rmv;
+
+ if (!del) {
+ if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
+ ret.set(cacheCtx, v, false, keepBinary);
+
+ return loadMissed;
+ }
+
+ GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+ v != null ? UPDATE : CREATE;
+
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+
+ if (enlisted != null)
+ enlisted.add(cacheKey);
+
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert optimistic() : txEntry;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(txEntry, txEntry.value(), ret, ver);
+ }
+ }
+
+ if (!pessimistic()) {
+ txEntry.markValid();
+
+ if (retval && !transform)
+ ret.set(cacheCtx, v, true, keepBinary);
+ else
+ ret.success(true);
+ }
+ }
+
+ return loadMissed;
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to remove.
+ * @param drMap DR map.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param filter Filter.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for asynchronous remove.
+ */
+ @SuppressWarnings("unchecked")
+ private <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable final Collection<? extends K> keys,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate filter,
+ boolean singleRmv) {
+ try {
+ checkUpdatesAllowed(cacheCtx);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
+
+ if (retval)
+ needReturnValue(true);
+
+ final Collection<?> keys0;
+
+ if (drMap != null) {
+ assert keys == null;
+
+ keys0 = drMap.keySet();
+ }
+ else
+ keys0 = keys;
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId;
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+
+ dataCenterId = opCtx.dataCenterId();
+ }
+ else
+ dataCenterId = null;
+
+ assert keys0 != null;
+
+ if (log.isDebugEnabled())
+ log.debug(S.toString("Called removeAllAsync(...)",
+ "tx", this, false,
+ "keys", keys0, true,
+ "implicit", implicit, false,
+ "retval", retval, false));
+
+ try {
+ checkValid();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ if (F.isEmpty(keys0)) {
+ if (implicit()) {
+ try {
+ commit();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ return new GridFinishedFuture<>(ret.success(true));
+ }
+
+ init();
+
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>();
+
+ ExpiryPolicy plc;
+
+ final CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+ if (!F.isEmpty(filters))
+ plc = opCtx != null ? opCtx.expiry() : null;
+ else
+ plc = null;
+
+ final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ keys0,
+ plc,
+ /*lookup map*/null,
+ /*invoke map*/null,
+ /*invoke arguments*/null,
+ retval,
+ /*lock only*/false,
+ filters,
+ ret,
+ enlisted,
+ null,
+ drMap,
+ opCtx != null && opCtx.skipStore(),
+ singleRmv,
+ keepBinary,
+ dataCenterId
+ );
+
+ if (log.isDebugEnabled())
+ log.debug("Remove keys: " + enlisted);
+
+ // Acquire locks only after having added operation to the write set.
+ // Otherwise, during rollback we will not know whether locks need
+ // to be rolled back.
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null) {
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ false,
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override protected GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for remove on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/true,
+ retval,
+ /*read*/false,
+ -1L,
+ filters,
+ /*computeInvoke*/false);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ else {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
+
+ return nonInterruptable(commitNearTxLocalAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ throws IgniteCheckedException {
+ try {
+ txFut.get();
+
+ return new GridCacheReturn(cacheCtx, true, keepBinary,
+ implicitRes.value(), implicitRes.success());
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ rollbackNearTxLocalAsync();
+
+ throw e;
+ }
+ }
+ }));
+ }
+ else {
+ return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f)
+ throws IgniteCheckedException {
+ f.get();
+
+ return ret;
+ }
+ }));
+ }
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to get.
+ * @param deserializeBinary Deserialize binary flag.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects
+ * @param skipStore Skip store flag.
+ * @return Future for this get.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
+ final GridCacheContext cacheCtx,
+ @Nullable final AffinityTopologyVersion entryTopVer,
+ Collection<KeyCacheObject> keys,
+ final boolean deserializeBinary,
+ final boolean skipVals,
+ final boolean keepCacheObjects,
+ final boolean skipStore,
+ final boolean needVer) {
+ if (F.isEmpty(keys))
+ return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+
+ init();
+
+ int keysCnt = keys.size();
+
+ boolean single = keysCnt == 1;
+
+ try {
+ checkValid();
+
+ final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
+
+ final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
+
+ final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
+ entryTopVer,
+ keys,
+ expiryPlc,
+ retMap,
+ missed,
+ keysCnt,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
+ needVer);
+
+ if (single && missed.isEmpty())
+ return new GridFinishedFuture<>(retMap);
+
+ // Handle locks.
+ if (pessimistic() && !readCommitted() && !skipVals) {
+ if (expiryPlc == null)
+ expiryPlc = cacheCtx.expiry();
+
+ long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
+ long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED;
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
+ timeout,
+ this,
+ true,
+ true,
+ isolation,
+ isInvalidate(),
+ createTtl,
+ accessTtl);
+
+ final ExpiryPolicy expiryPlc0 = expiryPlc;
+
+ PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
+ @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for read on keys: " + lockKeys);
+
+ // Load keys only after the locks have been acquired.
+ for (KeyCacheObject cacheKey : lockKeys) {
+ K keyVal = (K)
+ (keepCacheObjects ? cacheKey :
+ cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
+
+ if (retMap.containsKey(keyVal))
+ // We already have a return value.
+ continue;
+
+ IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+
+ IgniteTxEntry txEntry = entry(txKey);
+
+ assert txEntry != null;
+
+ // Check if there is cached value.
+ while (true) {
+ GridCacheEntryEx cached = txEntry.cached();
+
+ CacheObject val = null;
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ try {
+ Object transformClo =
+ (!F.isEmpty(txEntry.entryProcessors()) &&
+ cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ F.first(txEntry.entryProcessors()) : null;
+
+ if (needVer) {
+ getRes = cached.innerGetVersioned(
+ null,
+ GridNearTxLocal.this,
+ /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
+ /*unmarshal*/true,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ CU.subjectId(GridNearTxLocal.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary(),
+ null);
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else{
+ val = cached.innerGet(
+ null,
+ GridNearTxLocal.this,
+ cacheCtx.isSwapOrOffheapEnabled(),
+ /*read-through*/false,
+ /*metrics*/true,
+ /*events*/!skipVals,
+ /*temporary*/false,
+ CU.subjectId(GridNearTxLocal.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+ }
+
+ // If value is in cache and passed the filter.
+ if (val != null) {
+ missed.remove(cacheKey);
+
+ txEntry.setAndMarkValid(val);
+
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ cacheCtx.addResult(retMap,
+ cacheKey,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+
+ if (readVer != null)
+ txEntry.entryReadVersion(readVer);
+ }
+
+ // Even though we bring the value back from lock acquisition,
+ // we still need to recheck primary node for consistent values
+ // in case of concurrent transactional locks.
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed exception in get postLock (will retry): " +
+ cached);
+
+ txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
+ }
+ }
+ }
+
+ if (!missed.isEmpty() && cacheCtx.isLocal()) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return checkMissed(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ retMap,
+ missed,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
+ needVer,
+ expiryPlc0);
+ }
+
+ return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+ }
+ };
+
+ FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
+ @Override Map<K, V> finish(Map<K, V> loaded) {
+ retMap.putAll(loaded);
+
+ return retMap;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
+
+ return fut1.isDone() ?
+ new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
+ new GridEmbeddedFuture<>(finClos, fut1);
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return plc2.apply(false, e);
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return new GridEmbeddedFuture<>(
+ fut,
+ plc2,
+ finClos);
+ }
+ }
+ else {
+ assert optimistic() || readCommitted() || skipVals;
+
+ if (!missed.isEmpty()) {
+ if (!readCommitted())
+ for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
+ KeyCacheObject cacheKey = it.next();
+
+ K keyVal =
+ (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false));
+
+ if (retMap.containsKey(keyVal))
+ it.remove();
+ }
+
+ if (missed.isEmpty())
+ return new GridFinishedFuture<>(retMap);
+
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return checkMissed(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ retMap,
+ missed,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
+ needVer,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>(retMap);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ setRollbackOnly();
+
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Key to enlist.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param map Return map.
+ * @param missed Map of missed keys.
+ * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
+ * @param deserializeBinary Deserialize binary flag.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects flag.
+ * @param skipStore Skip store flag.
+ * @throws IgniteCheckedException If failed.
+ * @return Enlisted keys.
+ */
+ @SuppressWarnings({"RedundantTypeArguments"})
+ private <K, V> Collection<KeyCacheObject> enlistRead(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<KeyCacheObject> keys,
+ @Nullable ExpiryPolicy expiryPlc,
+ Map<K, V> map,
+ Map<KeyCacheObject, GridCacheVersion> missed,
+ int keysCnt,
+ boolean deserializeBinary,
+ boolean skipVals,
+ boolean keepCacheObjects,
+ boolean skipStore,
+ final boolean needVer
+ ) throws IgniteCheckedException {
+ assert !F.isEmpty(keys);
+ assert keysCnt == keys.size();
+
+ cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ boolean single = keysCnt == 1;
+
+ Collection<KeyCacheObject> lockKeys = null;
+
+ AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
+
+ boolean needReadVer = (serializable() && optimistic()) || needVer;
+
+ // In this loop we cover only read-committed or optimistic transactions.
+ // Transactions that are pessimistic and not read-committed are covered
+ // outside of this loop.
+ for (KeyCacheObject key : keys) {
+ if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
+ addActiveCache(cacheCtx);
+
+ IgniteTxKey txKey = cacheCtx.txKey(key);
+
+ // Check write map (always check writes first).
+ IgniteTxEntry txEntry = entry(txKey);
+
+ // Either non-read-committed or there was a previous write.
+ if (txEntry != null) {
+ CacheObject val = txEntry.value();
+
+ if (txEntry.hasValue()) {
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ if (val != null) {
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ if (txEntry.op() != READ)
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+ else {
+ ver = txEntry.entryReadVersion();
+
+ if (ver == null && pessimistic()) {
+ while (true) {
+ try {
+ GridCacheEntryEx cached = txEntry.cached();
+
+ ver = cached.isNear() ?
+ ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ }
+ }
+ }
+
+ if (ver == null) {
+ assert optimistic() && repeatableRead() : this;
+
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+ }
+ }
+
+ assert ver != null;
+ }
+
+ cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
+ ver, 0, 0);
+ }
+ }
+ else {
+ assert txEntry.op() == TRANSFORM;
+
+ while (true) {
+ try {
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ Object transformClo =
+ (txEntry.op() == TRANSFORM &&
+ cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ F.first(txEntry.entryProcessors()) : null;
+
+ if (needVer) {
+ getRes = txEntry.cached().innerGetVersioned(
+ null,
+ this,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary(),
+ null);
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else {
+ val = txEntry.cached().innerGet(
+ null,
+ this,
+ /*swap*/true,
+ /*read-through*/false,
+ /*metrics*/true,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+ }
+
+ if (val != null) {
+ if (!readCommitted() && !skipVals)
+ txEntry.readValue(val);
+
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+ }
+ else
+ missed.put(key, txEntry.cached().version());
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ }
+ }
+ }
+ }
+ // First time access within transaction.
+ else {
+ if (lockKeys == null && !skipVals)
+ lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
+
+ if (!single && !skipVals)
+ lockKeys.add(key);
+
+ while (true) {
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+
+ try {
+ GridCacheVersion ver = entry.version();
+
+ CacheObject val = null;
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ if (!pessimistic() || readCommitted() && !skipVals) {
+ IgniteCacheExpiryPolicy accessPlc =
+ optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
+
+ if (needReadVer) {
+ getRes = primaryLocal(entry) ?
+ entry.innerGetVersioned(
+ null,
+ this,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc,
+ !deserializeBinary,
+ null) : null;
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else {
+ val = entry.innerGet(
+ null,
+ this,
+ /*swap*/true,
+ /*read-through*/false,
+ /*metrics*/true,
+ /*event*/true,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc,
+ !deserializeBinary);
+ }
+
+ if (val != null) {
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+ }
+ else
+ missed.put(key, ver);
+ }
+ else
+ // We must wait for the lock in pessimistic mode.
+ missed.put(key, ver);
+
+ if (!readCommitted() && !skipVals) {
+ txEntry = addEntry(READ,
+ val,
+ null,
+ null,
+ entry,
+ expiryPlc,
+ null,
+ true,
+ -1L,
+ -1L,
+ null,
+ skipStore,
+ !deserializeBinary);
+
+ // As optimization, mark as checked immediately
+ // for non-pessimistic if value is not null.
+ if (val != null && !pessimistic()) {
+ txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(readVer);
+ }
+ }
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+ }
+ finally {
+ if (entry != null && readCommitted()) {
+ if (cacheCtx.isNear()) {
+ if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
+ if (entry.markObsolete(xidVer))
+ cacheCtx.cache().removeEntry(entry);
+ }
+ }
+ else
+ entry.context().evicts().touch(entry, topVer);
+ }
+ }
+ }
+ }
+ }
+
+ return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to load.
+ * @param filter Filter.
+ * @param ret Return value.
+ * @param needReadVer Read version flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param readThrough Read through flag.
+ * @param retval Return value flag.
+ * @param expiryPlc Expiry policy.
+ * @return Load future.
+ */
+ private IgniteInternalFuture<Void> loadMissing(
+ final GridCacheContext cacheCtx,
+ final AffinityTopologyVersion topVer,
+ final Set<KeyCacheObject> keys,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ final boolean needReadVer,
+ final boolean singleRmv,
+ final boolean hasFilters,
+ final boolean readThrough,
+ final boolean retval,
+ final boolean keepBinary,
+ final ExpiryPolicy expiryPlc) {
+ GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
+ new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+ @Override public void apply(KeyCacheObject key,
+ @Nullable Object val,
+ @Nullable GridCacheVersion loadVer) {
+ if (log.isDebugEnabled())
+ log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+
+ IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
+
+ assert e != null;
+
+ if (needReadVer) {
+ assert loadVer != null;
+
+ e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+ }
+
+ if (singleRmv) {
+ assert !hasFilters && !retval;
+ assert val == null || Boolean.TRUE.equals(val) : val;
+
+ ret.set(cacheCtx, null, val != null, keepBinary);
+ }
+ else {
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+ if (e.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ e.readValue(cacheVal);
+
+ try {
+ ver = e.cached().version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : e;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(e, cacheVal, ret, ver);
+ }
+ else {
+ boolean success;
+
+ if (hasFilters) {
+ success = isAll(e.context(), key, cacheVal, filter);
+
+ if (!success)
+ e.value(cacheVal, false, false);
+ }
+ else
+ success = true;
+
+ ret.set(cacheCtx, cacheVal, success, keepBinary);
+ }
+ }
+ }
+ };
+
+ return loadMissing(
+ cacheCtx,
+ topVer,
+ readThrough,
+ /*async*/true,
+ keys,
+ /*skipVals*/singleRmv,
+ needReadVer,
+ keepBinary,
+ expiryPlc,
+ c);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param loadFut Missing keys load future.
+ * @param ret Future result.
+ * @param keepBinary Keep binary flag.
+ * @return Future.
+ */
+ private IgniteInternalFuture optimisticPutFuture(
+ final GridCacheContext cacheCtx,
+ IgniteInternalFuture<Void> loadFut,
+ final GridCacheReturn ret,
+ final boolean keepBinary
+ ) {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
+
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return nonInterruptable(commitNearTxLocalAsync().chain(
+ new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ throws IgniteCheckedException {
+ try {
+ txFut.get();
+
+ Object res = implicitRes.value();
+
+ if (implicitRes.invokeResult()) {
+ assert res == null || res instanceof Map : implicitRes;
- /**
- * @return If backup check was requested.
- */
- public boolean needCheckBackup() {
- return needCheckBackup != null;
- }
+ res = cacheCtx.unwrapInvokeResult((Map)res, keepBinary);
+ }
- /**
- * @return {@code True} if transaction contains at least one near cache key mapped to the local node.
- */
- public boolean nearLocallyMapped() {
- return nearLocallyMapped;
- }
+ return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ if (!(e instanceof NodeStoppingException))
+ rollbackNearTxLocalAsync();
- /**
- * @param nearLocallyMapped {@code True} if transaction contains near key mapped to the local
<TRUNCATED>