You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 15:16:03 UTC
[1/5] ignite git commit: ignite-4768 txs
Repository: ignite
Updated Branches:
refs/heads/ignite-4768-1 [created] 5523eac7d
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index f5687a0..0cf1d67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -18,18 +18,12 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.Collection;
-import java.util.Map;
-import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.jetbrains.annotations.Nullable;
@@ -59,113 +53,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
public void userRollback() throws IgniteCheckedException;
/**
- * @param cacheCtx Cache context.
- * @param keys Keys to get.
- * @param deserializeBinary Deserialize binary flag.
- * @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects
- * @param skipStore Skip store flag.
- * @return Future for this get.
- */
- public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<KeyCacheObject> keys,
- boolean deserializeBinary,
- boolean skipVals,
- boolean keepCacheObjects,
- boolean skipStore,
- boolean needVer);
-
- /**
- * @param cacheCtx Cache context.
- * @param map Map to put.
- * @param retval Flag indicating whether a value should be returned.
- * @return Future for put operation.
- */
- public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Map<? extends K, ? extends V> map,
- boolean retval);
-
- /**
- * @param cacheCtx Cache context.
- * @param key Key.
- * @param val Value.
- * @param retval Return value flag.
- * @param filter Filter.
- * @return Future for put operation.
- */
- public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- K key,
- V val,
- boolean retval,
- CacheEntryPredicate filter);
-
- /**
- * @param cacheCtx Cache context.
- * @param key Key.
- * @param entryProcessor Entry processor.
- * @param invokeArgs Optional arguments for entry processor.
- * @return Operation future.
- */
- public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- K key,
- EntryProcessor<K, V, Object> entryProcessor,
- Object... invokeArgs);
-
- /**
- * @param cacheCtx Cache context.
- * @param map Entry processors map.
- * @param invokeArgs Optional arguments for entry processor.
- * @return Operation future.
- */
- public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
- Object... invokeArgs);
-
- /**
- * @param cacheCtx Cache context.
- * @param keys Keys to remove.
- * @param retval Flag indicating whether a value should be returned.
- * @param filter Filter.
- * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
- * @return Future for asynchronous remove.
- */
- public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<? extends K> keys,
- boolean retval,
- CacheEntryPredicate filter,
- boolean singleRmv);
-
- /**
- * @param cacheCtx Cache context.
- * @param drMap DR map to put.
- * @return Future for DR put operation.
- */
- public IgniteInternalFuture<?> putAllDrAsync(
- GridCacheContext cacheCtx,
- Map<KeyCacheObject, GridCacheDrInfo> drMap);
-
- /**
- * @param cacheCtx Cache context.
- * @param drMap DR map.
- * @return Future for asynchronous remove.
- */
- public IgniteInternalFuture<?> removeAllDrAsync(
- GridCacheContext cacheCtx,
- Map<KeyCacheObject, GridCacheVersion> drMap);
-
- /**
* Finishes transaction (either commit or rollback).
*
* @param commit {@code True} if commit, {@code false} if rollback.
@@ -173,27 +60,4 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
* @throws IgniteCheckedException If finish failed.
*/
public boolean finish(boolean commit) throws IgniteCheckedException;
-
- /**
- * @param cacheCtx Cache context.
- * @param readThrough Read through flag.
- * @param async if {@code True}, then loading will happen in a separate thread.
- * @param keys Keys.
- * @param skipVals Skip values flag.
- * @param needVer If {@code true} version is required for loaded values.
- * @param c Closure to be applied for loaded values.
- * @param expiryPlc Expiry policy.
- * @return Future with {@code True} value if loading took place.
- */
- public IgniteInternalFuture<Void> loadMissing(
- GridCacheContext cacheCtx,
- AffinityTopologyVersion topVer,
- boolean readThrough,
- boolean async,
- Collection<KeyCacheObject> keys,
- boolean skipVals,
- boolean needVer,
- boolean keepBinary,
- final ExpiryPolicy expiryPlc,
- GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index df3bad2..ff4a4e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -206,54 +206,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private TxDeadlockDetection txDeadlockDetection;
/** {@inheritDoc} */
- @Override protected void onKernalStart0(boolean reconnect) {
- if (reconnect)
- return;
-
- cctx.gridEvents().addLocalEventListener(
- new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent;
- assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- UUID nodeId = discoEvt.eventNode().id();
-
- cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
-
- if (txFinishSync != null)
- txFinishSync.onNodeLeft(nodeId);
-
- for (TxDeadlockFuture fut : deadlockDetectFuts.values())
- fut.onNodeLeft(nodeId);
-
- for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
- Object obj = entry.getValue();
-
- if (obj instanceof GridCacheReturnCompletableWrapper &&
- nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
- removeTxReturn(entry.getKey());
- }
- }
- },
- EVT_NODE_FAILED, EVT_NODE_LEFT);
-
- this.txDeadlockDetection = new TxDeadlockDetection(cctx);
-
- cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
-
- for (IgniteInternalTx tx : idMap.values()) {
- if ((!tx.local() || tx.dht()) && !cctx.discovery().aliveAll(tx.masterNodeIds())) {
- if (log.isDebugEnabled())
- log.debug("Remaining transaction from left node: " + tx);
-
- salvageTx(tx, true, USER_FINISH);
- }
- }
- }
-
- /** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridIO().removeMessageListener(TOPIC_TX);
}
@@ -293,6 +245,39 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
}
};
+
+ cctx.gridEvents().addLocalEventListener(
+ new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt instanceof DiscoveryEvent;
+ assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID nodeId = discoEvt.eventNode().id();
+
+ cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
+
+ if (txFinishSync != null)
+ txFinishSync.onNodeLeft(nodeId);
+
+ for (TxDeadlockFuture fut : deadlockDetectFuts.values())
+ fut.onNodeLeft(nodeId);
+
+ for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
+ Object obj = entry.getValue();
+
+ if (obj instanceof GridCacheReturnCompletableWrapper &&
+ nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
+ removeTxReturn(entry.getKey());
+ }
+ }
+ },
+ EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ this.txDeadlockDetection = new TxDeadlockDetection(cctx);
+
+ cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
}
/** {@inheritDoc} */
@@ -461,7 +446,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param txSize Expected transaction size.
* @return New transaction.
*/
- public IgniteTxLocalAdapter newTx(
+ public GridNearTxLocal newTx(
boolean implicit,
boolean implicitSingle,
@Nullable GridCacheContext sysCacheCtx,
@@ -672,13 +657,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cctx Cache context.
* @return Transaction for current thread.
*/
- @SuppressWarnings({"unchecked"})
- public <T> T threadLocalTx(GridCacheContext cctx) {
+ public GridNearTxLocal threadLocalTx(GridCacheContext cctx) {
IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId());
- return tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit() ? (T)tx : null;
+ if (tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit()) {
+ assert tx instanceof GridNearTxLocal : tx;
+
+ return (GridNearTxLocal)tx;
+ }
+
+ return null;
}
/**
@@ -747,48 +738,53 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Local transaction.
*/
@Nullable public IgniteInternalTx localTxx() {
- IgniteInternalTx tx = txx();
+ IgniteInternalTx tx = tx();
return tx != null && tx.local() ? tx : null;
}
/**
- * @return Transaction for current thread.
- */
- @SuppressWarnings({"unchecked"})
- public IgniteInternalTx txx() {
- return tx();
- }
-
- /**
* @return User transaction for current thread.
*/
- @Nullable public IgniteInternalTx userTx() {
+ @Nullable public GridNearTxLocal userTx() {
IgniteInternalTx tx = txContext();
- if (tx != null && tx.user() && tx.state() == ACTIVE)
- return tx;
+ if (activeUserTx(tx))
+ return (GridNearTxLocal)tx;
tx = tx(null, Thread.currentThread().getId());
- return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
+ if (activeUserTx(tx))
+ return (GridNearTxLocal)tx;
+
+ return null;
}
/**
+ * @param cctx Cache context.
* @return User transaction for current thread.
*/
- @Nullable public IgniteInternalTx userTx(GridCacheContext cctx) {
+ @Nullable GridNearTxLocal userTx(GridCacheContext cctx) {
IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId());
- return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
+ if (activeUserTx(tx))
+ return (GridNearTxLocal)tx;
+
+ return null;
}
/**
- * @return User transaction.
+ * @param tx Transaction.
+ * @return {@code True} if given transaction is explicitly started user transaction.
*/
- @SuppressWarnings({"unchecked"})
- @Nullable public <T extends IgniteTxLocalEx> T userTxx() {
- return (T)userTx();
+ private boolean activeUserTx(@Nullable IgniteInternalTx tx) {
+ if (tx != null && tx.user() && tx.state() == ACTIVE) {
+ assert tx instanceof GridNearTxLocal : tx;
+
+ return true;
+ }
+
+ return false;
}
/**
@@ -1984,7 +1980,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (tx instanceof GridDistributedTxRemoteAdapter) {
IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
- rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(),
+ rmtTx.doneRemote(tx.xidVersion(),
+ Collections.<GridCacheVersion>emptyList(),
+ Collections.<GridCacheVersion>emptyList(),
Collections.<GridCacheVersion>emptyList());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 6134b9f..8ffec00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -51,7 +52,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
/** Wrapped transaction. */
@GridToStringInclude
- private IgniteInternalTx tx;
+ private GridNearTxLocal tx;
/** Gateway. */
@GridToStringExclude
@@ -75,7 +76,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
* @param cctx Shared context.
* @param async Async flag.
*/
- public TransactionProxyImpl(IgniteInternalTx tx, GridCacheSharedContext<K, V> cctx, boolean async) {
+ public TransactionProxyImpl(GridNearTxLocal tx, GridCacheSharedContext<K, V> cctx, boolean async) {
assert tx != null;
assert cctx != null;
@@ -87,7 +88,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
/**
* @return Transaction.
*/
- public IgniteInternalTx tx() {
+ public GridNearTxLocal tx() {
return tx;
}
@@ -316,7 +317,9 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
IgniteInternalFuture<Transaction> fut0 = fut.chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() {
@Override public Transaction applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException {
- return fut.get().proxy();
+ fut.get();
+
+ return TransactionProxyImpl.this;
}
});
@@ -330,7 +333,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- tx = (IgniteInternalTx)in.readObject();
+ tx = (GridNearTxLocal)in.readObject();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java
index bbb1d4e..77a8d36 100644
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java
@@ -22,6 +22,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.transactions.Transaction;
import org.hibernate.cache.CacheException;
@@ -250,15 +251,15 @@ public class HibernateReadWriteAccessStrategy extends HibernateAccessStrategyAda
if (ctx.unlocked(key)) { // Finish transaction if last key is unlocked.
txCtx.remove();
- Transaction tx = cache.tx();
+ GridNearTxLocal tx = cache.tx();
assert tx != null;
try {
- tx.commit();
+ tx.proxy().commit();
}
finally {
- tx.close();
+ tx.proxy().close();
}
assert cache.tx() == null;
@@ -275,10 +276,10 @@ public class HibernateReadWriteAccessStrategy extends HibernateAccessStrategyAda
if (ctx != null) {
txCtx.remove();
- Transaction tx = cache.tx();
+ GridNearTxLocal tx = cache.tx();
if (tx != null)
- tx.rollback();
+ tx.proxy().rollback();
}
}
catch (IgniteException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
index f581ebb..5047491 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.jta.CacheTmLookup;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
@@ -151,7 +152,7 @@ public class CacheJtaManager extends CacheJtaManagerAdapter {
Transaction jtaTx = jtaTm.getTransaction();
if (jtaTx != null) {
- IgniteInternalTx tx = cctx.tm().userTx();
+ GridNearTxLocal tx = cctx.tm().userTx();
if (tx == null) {
TransactionConfiguration tCfg = cctx.kernalContext().config()
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
index f43981e..611a366 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
@@ -27,6 +27,7 @@ import javax.transaction.xa.Xid;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -51,7 +52,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
private static final Xid[] NO_XID = new Xid[] {};
/** Cache transaction. */
- private IgniteInternalTx cacheTx;
+ private GridNearTxLocal cacheTx;
/** */
private Xid xid;
@@ -60,7 +61,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
* @param cacheTx Cache jta.
* @param ctx Kernal context.
*/
- public CacheJtaResource(IgniteInternalTx cacheTx, GridKernalContext ctx) {
+ CacheJtaResource(GridNearTxLocal cacheTx, GridKernalContext ctx) {
assert cacheTx != null;
assert ctx != null;
@@ -291,7 +292,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
*
* @return {@code true} if jta was already committed or rolled back.
*/
- public boolean isFinished() {
+ boolean isFinished() {
TransactionState state = cacheTx.state();
return state == COMMITTED || state == ROLLED_BACK;
[4/5] ignite git commit: ignite-4768 txs
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8ed749c..aa94ea7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -18,24 +18,37 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Externalizable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.cache.Cache;
+import javax.cache.CacheException;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -46,31 +59,52 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -135,6 +169,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** */
private boolean hasRemoteLocks;
+ /** */
+ @GridToStringExclude
+ private TransactionProxyImpl proxy;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -213,99 +251,2174 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return cctx.localNodeId();
}
- /** {@inheritDoc} */
- @Override protected IgniteUuid nearFutureId() {
- assert false : "nearFutureId should not be called for colocated transactions.";
+ /** {@inheritDoc} */
+ @Override protected IgniteUuid nearFutureId() {
+ assert false : "nearFutureId should not be called for colocated transactions.";
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<Boolean> addReader(
+ long msgId,
+ GridDhtCacheEntry cached,
+ IgniteTxEntry entry,
+ AffinityTopologyVersion topVer
+ ) {
+ // We are in near transaction, do not add local node as reader.
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void sendFinishReply(@Nullable Throwable err) {
+ // We are in near transaction, do not send finish reply to local node.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) {
+ PREP_FUT_UPD.compareAndSet(this, fut, null);
+ }
+
+ /**
+ * Marks transaction to check if commit on backup.
+ */
+ void markForBackupCheck() {
+ needCheckBackup = true;
+ }
+
+ /**
+ * @return If need to check tx commit on backup.
+ */
+ boolean onNeedCheckBackup() {
+ Boolean check = needCheckBackup;
+
+ if (check != null && check) {
+ needCheckBackup = false;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return If backup check was requested.
+ */
+ boolean needCheckBackup() {
+ return needCheckBackup != null;
+ }
+
+ /**
+ * @return {@code True} if transaction contains at least one near cache key mapped to the local node.
+ */
+ public boolean nearLocallyMapped() {
+ return nearLocallyMapped;
+ }
+
+ /**
+ * @param nearLocallyMapped {@code True} if transaction contains near key mapped to the local node.
+ */
+ void nearLocallyMapped(boolean nearLocallyMapped) {
+ this.nearLocallyMapped = nearLocallyMapped;
+ }
+
+ /**
+ * @return {@code True} if transaction contains colocated key mapped to the local node.
+ */
+ public boolean colocatedLocallyMapped() {
+ return colocatedLocallyMapped;
+ }
+
+ /**
+ * @param colocatedLocallyMapped {@code True} if transaction contains colocated key mapped to the local node.
+ */
+ public void colocatedLocallyMapped(boolean colocatedLocallyMapped) {
+ this.colocatedLocallyMapped = colocatedLocallyMapped;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) {
+ return entry.detached() || super.ownsLockUnsafe(entry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException {
+ return entry.detached() || super.ownsLock(entry);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param map Map to put.
+ * @param retval Flag indicating whether a value should be returned.
+ * @return Future for put operation.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Map<? extends K, ? extends V> map,
+ boolean retval
+ ) {
+ return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+ entryTopVer,
+ map,
+ null,
+ null,
+ null,
+ retval);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @return Future for put operation.
+ */
+ public final <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ V val,
+ boolean retval,
+ CacheEntryPredicate filter) {
+ return putAsync0(cacheCtx,
+ entryTopVer,
+ key,
+ val,
+ null,
+ null,
+ retval,
+ filter);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @return Operation future.
+ */
+ public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ EntryProcessor<K, V, Object> entryProcessor,
+ Object... invokeArgs) {
+ return (IgniteInternalFuture)putAsync0(cacheCtx,
+ entryTopVer,
+ key,
+ null,
+ entryProcessor,
+ invokeArgs,
+ true,
+ null);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param map Entry processors map.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @return Operation future.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
+ Object... invokeArgs
+ ) {
+ return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+ entryTopVer,
+ null,
+ map,
+ invokeArgs,
+ null,
+ true);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param drMap DR map to put.
+ * @return Future for DR put operation.
+ */
+ public IgniteInternalFuture<?> putAllDrAsync(
+ GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheDrInfo> drMap
+ ) {
+ Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+ @Override public Object apply(GridCacheDrInfo val) {
+ return val.value();
+ }
+ });
+
+ return this.<Object, Object>putAllAsync0(cacheCtx,
+ null,
+ map,
+ null,
+ null,
+ drMap,
+ false);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param drMap DR map.
+ * @return Future for asynchronous remove.
+ */
+ public IgniteInternalFuture<?> removeAllDrAsync(
+ GridCacheContext cacheCtx,
+ Map<KeyCacheObject, GridCacheVersion> drMap
+ ) {
+ return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to remove.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param filter Filter.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for asynchronous remove.
+ */
+ public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
+ GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<? extends K> keys,
+ boolean retval,
+ CacheEntryPredicate filter,
+ boolean singleRmv
+ ) {
+ return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv);
+ }
+
+ /**
+ * Internal method for single update operation.
+ *
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @return Operation future.
+ */
+ private <K, V> IgniteInternalFuture putAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ K key,
+ @Nullable V val,
+ @Nullable EntryProcessor<K, V, Object> entryProcessor,
+ @Nullable final Object[] invokeArgs,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate filter
+ ) {
+ assert key != null;
+
+ try {
+ beforePut(cacheCtx, retval);
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+ boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ opCtx != null ? opCtx.expiry() : null,
+ entryProcessor,
+ invokeArgs,
+ retval,
+ /*lockOnly*/false,
+ filters,
+ ret,
+ opCtx != null && opCtx.skipStore(),
+ /*singleRmv*/false,
+ keepBinary,
+ dataCenterId);
+
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null)
+ loadFut.get();
+
+ final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey);
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for put on key: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ /*read*/entryProcessor != null, // Needed to force load from store.
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override public GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/false,
+ retval,
+ /*read*/false,
+ -1L,
+ filters,
+ /*computeInvoke*/true);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ }
+ else
+ return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
+ * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
+ * maps must be non-null.
+ *
+ * @param cacheCtx Context.
+ * @param map Key-value map to store.
+ * @param invokeMap Invoke map.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param drMap DR map.
+ * @param retval Key-transform value map to store.
+ * @return Operation future.
+ */
+ @SuppressWarnings("unchecked")
+ private <K, V> IgniteInternalFuture putAllAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable Map<? extends K, ? extends V> map,
+ @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
+ @Nullable final Object[] invokeArgs,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
+ final boolean retval
+ ) {
+ try {
+ beforePut(cacheCtx, retval);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId;
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+ assert map != null || invokeMap != null;
+
+ dataCenterId = opCtx.dataCenterId();
+ }
+ else
+ dataCenterId = null;
+
+ // Cached entry may be passed only from entry wrapper.
+ final Map<?, ?> map0 = map;
+ final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+
+ if (log.isDebugEnabled())
+ log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
+
+ assert map0 != null || invokeMap0 != null;
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
+ if (implicit())
+ try {
+ commit();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return new GridFinishedFuture<>(ret.success(true));
+ }
+
+ try {
+ Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
+
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
+
+ final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ keySet,
+ opCtx != null ? opCtx.expiry() : null,
+ map0,
+ invokeMap0,
+ invokeArgs,
+ retval,
+ false,
+ CU.filterArray(null),
+ ret,
+ enlisted,
+ drMap,
+ null,
+ opCtx != null && opCtx.skipStore(),
+ false,
+ keepBinary,
+ dataCenterId);
+
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null) {
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ /*read*/invokeMap != null, // Needed to force load from store.
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override public GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/false,
+ retval,
+ /*read*/false,
+ -1L,
+ CU.filterArray(null),
+ /*computeInvoke*/true);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ }
+ else
+ return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key to enlist.
+ * @param val Value.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param entryProcessor Entry processor (for invoke operation).
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+ * @param filter User filters.
+ * @param ret Return value.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for entry values loading.
+ */
+ private <K, V> IgniteInternalFuture<Void> enlistWrite(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ KeyCacheObject cacheKey,
+ Object val,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable EntryProcessor<K, V, Object> entryProcessor,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ boolean skipStore,
+ final boolean singleRmv,
+ boolean keepBinary,
+ Byte dataCenterId) {
+ try {
+ addActiveCache(cacheCtx);
+
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needVal = singleRmv || retval || hasFilters;
+ final boolean needReadVer = needVal && (serializable() && optimistic());
+
+ if (entryProcessor != null)
+ transform = true;
+
+ GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
+
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ /*drVer*/drVer,
+ /*drTtl*/-1L,
+ /*drExpireTime*/-1L,
+ ret,
+ /*enlisted*/null,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer,
+ keepBinary);
+
+ if (loadMissed) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return loadMissing(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ Collections.singleton(cacheKey),
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+ retval,
+ keepBinary,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * Internal routine for <tt>putAll(..)</tt>
+ *
+ * @param cacheCtx Cache context.
+ * @param keys Keys to enlist.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param lookup Value lookup map ({@code null} for remove).
+ * @param invokeMap Map with entry processors for invoke operation.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+ * @param filter User filters.
+ * @param ret Return value.
+ * @param enlisted Collection of keys enlisted into this transaction.
+ * @param drPutMap DR put map (optional).
+ * @param drRmvMap DR remove map (optional).
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @param keepBinary Keep binary flag.
+ * @param dataCenterId Optional data center ID.
+ * @return Future for missing values loading.
+ */
+ private <K, V> IgniteInternalFuture<Void> enlistWrite(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<?> keys,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable Map<?, ?> lookup,
+ @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ Collection<KeyCacheObject> enlisted,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
+ boolean skipStore,
+ final boolean singleRmv,
+ final boolean keepBinary,
+ Byte dataCenterId
+ ) {
+ assert retval || invokeMap == null;
+
+ try {
+ addActiveCache(cacheCtx);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ boolean rmv = lookup == null && invokeMap == null;
+
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needVal = singleRmv || retval || hasFilters;
+ final boolean needReadVer = needVal && (serializable() && optimistic());
+
+ try {
+ // Set transform flag for transaction.
+ if (invokeMap != null)
+ transform = true;
+
+ Set<KeyCacheObject> missedForLoad = null;
+
+ for (Object key : keys) {
+ if (key == null) {
+ rollback();
+
+ throw new NullPointerException("Null key.");
+ }
+
+ Object val = rmv || lookup == null ? null : lookup.get(key);
+ EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
+
+ GridCacheVersion drVer;
+ long drTtl;
+ long drExpireTime;
+
+ if (drPutMap != null) {
+ GridCacheDrInfo info = drPutMap.get(key);
+
+ assert info != null;
+
+ drVer = info.version();
+ drTtl = info.ttl();
+ drExpireTime = info.expireTime();
+ }
+ else if (drRmvMap != null) {
+ assert drRmvMap.get(key) != null;
+
+ drVer = drRmvMap.get(key);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else if (dataCenterId != null) {
+ drVer = cctx.versions().next(dataCenterId);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else {
+ drVer = null;
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+
+ if (!rmv && val == null && entryProcessor == null) {
+ setRollbackOnly();
+
+ throw new NullPointerException("Null value.");
+ }
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ entryTopVer,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ drVer,
+ drTtl,
+ drExpireTime,
+ ret,
+ enlisted,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer,
+ keepBinary);
+
+ if (loadMissed) {
+ if (missedForLoad == null)
+ missedForLoad = new HashSet<>();
+
+ missedForLoad.add(cacheKey);
+ }
+ }
+
+ if (missedForLoad != null) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return loadMissing(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ missedForLoad,
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
+ retval,
+ keepBinary,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param retval Return value flag.
+ * @param lockOnly Lock only flag.
+ * @param filter Filter.
+ * @param drVer DR version.
+ * @param drTtl DR ttl.
+ * @param drExpireTime DR expire time.
+ * @param ret Return value.
+ * @param enlisted Enlisted keys collection.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param needVal {@code True} if value is needed.
+ * @param needReadVer {@code True} if need read entry version.
+ * @return {@code True} if entry value should be loaded.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ final KeyCacheObject cacheKey,
+ @Nullable final Object val,
+ @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
+ @Nullable final Object[] invokeArgs,
+ @Nullable final ExpiryPolicy expiryPlc,
+ final boolean retval,
+ final boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheVersion drVer,
+ final long drTtl,
+ long drExpireTime,
+ final GridCacheReturn ret,
+ @Nullable final Collection<KeyCacheObject> enlisted,
+ boolean skipStore,
+ boolean singleRmv,
+ boolean hasFilters,
+ final boolean needVal,
+ boolean needReadVer,
+ boolean keepBinary
+ ) throws IgniteCheckedException {
+ boolean loadMissed = false;
+
+ final boolean rmv = val == null && entryProcessor == null;
+
+ IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+
+ IgniteTxEntry txEntry = entry(txKey);
+
+ // First time access.
+ if (txEntry == null) {
+ while (true) {
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
+
+ try {
+ entry.unswap(false);
+
+ // Check if lock is being explicitly acquired by the same thread.
+ if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+ entry.lockedByThread(threadId, xidVer)) {
+ throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+ "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
+ ", entry=" + entry +
+ ", xidVer=" + xidVer +
+ ", threadId=" + threadId +
+ ", locNodeId=" + cctx.localNodeId() + ']');
+ }
+
+ CacheObject old = null;
+ GridCacheVersion readVer = null;
+
+ if (optimistic() && !implicit()) {
+ try {
+ if (needReadVer) {
+ EntryGetResult res = primaryLocal(entry) ?
+ entry.innerGetVersioned(
+ null,
+ this,
+ /*swap*/false,
+ /*unmarshal*/retval || needVal,
+ /*metrics*/retval,
+ /*events*/retval,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null,
+ keepBinary,
+ null) : null;
+
+ if (res != null) {
+ old = res.value();
+ readVer = res.version();
+ }
+ }
+ else {
+ old = entry.innerGet(
+ null,
+ this,
+ /*swap*/false,
+ /*read-through*/false,
+ /*metrics*/retval,
+ /*events*/retval,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null,
+ keepBinary);
+ }
+ }
+ catch (ClusterTopologyCheckedException e) {
+ entry.context().evicts().touch(entry, topologyVersion());
+
+ throw e;
+ }
+ }
+ else
+ old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+
+ final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+ entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+ if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
+ ret.set(cacheCtx, old, false, keepBinary);
+
+ if (!readCommitted()) {
+ if (optimistic() && serializable()) {
+ txEntry = addEntry(op,
+ old,
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+ }
+ else {
+ txEntry = addEntry(READ,
+ old,
+ null,
+ null,
+ entry,
+ null,
+ CU.empty0(),
+ false,
+ -1L,
+ -1L,
+ null,
+ skipStore,
+ keepBinary);
+ }
+
+ txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
+ }
+
+ if (readCommitted())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ break; // While.
+ }
+
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+
+ if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ if (enlisted != null)
+ enlisted.add(cacheKey);
+
+ if (!pessimistic() && !implicit()) {
+ txEntry.markValid();
+
+ if (old == null) {
+ if (needVal)
+ loadMissed = true;
+ else {
+ assert !implicit() || !transform : this;
+ assert txEntry.op() != TRANSFORM : txEntry;
+
+ if (retval)
+ ret.set(cacheCtx, null, true, keepBinary);
+ else
+ ret.success(true);
+ }
+ }
+ else {
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
+
+ if (retval && !transform)
+ ret.set(cacheCtx, old, true, keepBinary);
+ else {
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : txEntry;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version " +
+ "[err=" + ex.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(txEntry, old, ret, ver);
+ }
+ else
+ ret.success(true);
+ }
+ }
+ }
+ // Pessimistic.
+ else {
+ if (retval && !transform)
+ ret.set(cacheCtx, old, true, keepBinary);
+ else
+ ret.success(true);
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction putAll0 method: " + entry);
+ }
+ }
+ }
+ else {
+ if (entryProcessor == null && txEntry.op() == TRANSFORM)
+ throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+ "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
+
+ GridCacheEntryEx entry = txEntry.cached();
+
+ CacheObject v = txEntry.value();
+
+ boolean del = txEntry.op() == DELETE && rmv;
+
+ if (!del) {
+ if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
+ ret.set(cacheCtx, v, false, keepBinary);
+
+ return loadMissed;
+ }
+
+ GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+ v != null ? UPDATE : CREATE;
+
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore,
+ keepBinary);
+
+ if (enlisted != null)
+ enlisted.add(cacheKey);
+
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert optimistic() : txEntry;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(txEntry, txEntry.value(), ret, ver);
+ }
+ }
+
+ if (!pessimistic()) {
+ txEntry.markValid();
+
+ if (retval && !transform)
+ ret.set(cacheCtx, v, true, keepBinary);
+ else
+ ret.success(true);
+ }
+ }
+
+ return loadMissed;
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to remove.
+ * @param drMap DR map.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param filter Filter.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for asynchronous remove.
+ */
+ @SuppressWarnings("unchecked")
+ private <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ @Nullable final Collection<? extends K> keys,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate filter,
+ boolean singleRmv) {
+ try {
+ checkUpdatesAllowed(cacheCtx);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
+
+ if (retval)
+ needReturnValue(true);
+
+ final Collection<?> keys0;
+
+ if (drMap != null) {
+ assert keys == null;
+
+ keys0 = drMap.keySet();
+ }
+ else
+ keys0 = keys;
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId;
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+
+ dataCenterId = opCtx.dataCenterId();
+ }
+ else
+ dataCenterId = null;
+
+ assert keys0 != null;
+
+ if (log.isDebugEnabled())
+ log.debug(S.toString("Called removeAllAsync(...)",
+ "tx", this, false,
+ "keys", keys0, true,
+ "implicit", implicit, false,
+ "retval", retval, false));
+
+ try {
+ checkValid();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ if (F.isEmpty(keys0)) {
+ if (implicit()) {
+ try {
+ commit();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ return new GridFinishedFuture<>(ret.success(true));
+ }
+
+ init();
+
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>();
+
+ ExpiryPolicy plc;
+
+ final CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+ if (!F.isEmpty(filters))
+ plc = opCtx != null ? opCtx.expiry() : null;
+ else
+ plc = null;
+
+ final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ entryTopVer,
+ keys0,
+ plc,
+ /*lookup map*/null,
+ /*invoke map*/null,
+ /*invoke arguments*/null,
+ retval,
+ /*lock only*/false,
+ filters,
+ ret,
+ enlisted,
+ null,
+ drMap,
+ opCtx != null && opCtx.skipStore(),
+ singleRmv,
+ keepBinary,
+ dataCenterId
+ );
+
+ if (log.isDebugEnabled())
+ log.debug("Remove keys: " + enlisted);
+
+ // Acquire locks only after having added operation to the write set.
+ // Otherwise, during rollback we will not know whether locks need
+ // to be rolled back.
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ if (loadFut != null) {
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ timeout,
+ this,
+ false,
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override protected GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for remove on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/true,
+ retval,
+ /*read*/false,
+ -1L,
+ filters,
+ /*computeInvoke*/false);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ else {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
+
+ return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ throws IgniteCheckedException {
+ try {
+ txFut.get();
+
+ return new GridCacheReturn(cacheCtx, true, keepBinary,
+ implicitRes.value(), implicitRes.success());
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ rollbackAsync();
+
+ throw e;
+ }
+ }
+ }));
+ }
+ else {
+ return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f)
+ throws IgniteCheckedException {
+ f.get();
+
+ return ret;
+ }
+ }));
+ }
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to get.
+ * @param deserializeBinary Deserialize binary flag.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects
+ * @param skipStore Skip store flag.
+ * @return Future for this get.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
+ final GridCacheContext cacheCtx,
+ @Nullable final AffinityTopologyVersion entryTopVer,
+ Collection<KeyCacheObject> keys,
+ final boolean deserializeBinary,
+ final boolean skipVals,
+ final boolean keepCacheObjects,
+ final boolean skipStore,
+ final boolean needVer) {
+ if (F.isEmpty(keys))
+ return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+
+ init();
+
+ int keysCnt = keys.size();
+
+ boolean single = keysCnt == 1;
+
+ try {
+ checkValid();
+
+ final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
+
+ final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
+
+ final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
+ entryTopVer,
+ keys,
+ expiryPlc,
+ retMap,
+ missed,
+ keysCnt,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
+ needVer);
+
+ if (single && missed.isEmpty())
+ return new GridFinishedFuture<>(retMap);
+
+ // Handle locks.
+ if (pessimistic() && !readCommitted() && !skipVals) {
+ if (expiryPlc == null)
+ expiryPlc = cacheCtx.expiry();
+
+ long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
+ long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED;
+
+ long timeout = remainingTime();
+
+ if (timeout == -1)
+ return new GridFinishedFuture<>(timeoutException());
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
+ timeout,
+ this,
+ true,
+ true,
+ isolation,
+ isInvalidate(),
+ createTtl,
+ accessTtl);
+
+ final ExpiryPolicy expiryPlc0 = expiryPlc;
+
+ PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
+ @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for read on keys: " + lockKeys);
+
+ // Load keys only after the locks have been acquired.
+ for (KeyCacheObject cacheKey : lockKeys) {
+ K keyVal = (K)
+ (keepCacheObjects ? cacheKey :
+ cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
+
+ if (retMap.containsKey(keyVal))
+ // We already have a return value.
+ continue;
+
+ IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
+
+ IgniteTxEntry txEntry = entry(txKey);
+
+ assert txEntry != null;
+
+ // Check if there is cached value.
+ while (true) {
+ GridCacheEntryEx cached = txEntry.cached();
+
+ CacheObject val = null;
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ try {
+ Object transformClo =
+ (!F.isEmpty(txEntry.entryProcessors()) &&
+ cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ F.first(txEntry.entryProcessors()) : null;
+
+ if (needVer) {
+ getRes = cached.innerGetVersioned(
+ null,
+ GridNearTxLocal.this,
+ /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
+ /*unmarshal*/true,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ CU.subjectId(GridNearTxLocal.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary(),
+ null);
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else{
+ val = cached.innerGet(
+ null,
+ GridNearTxLocal.this,
+ cacheCtx.isSwapOrOffheapEnabled(),
+ /*read-through*/false,
+ /*metrics*/true,
+ /*events*/!skipVals,
+ /*temporary*/false,
+ CU.subjectId(GridNearTxLocal.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+ }
+
+ // If value is in cache and passed the filter.
+ if (val != null) {
+ missed.remove(cacheKey);
+
+ txEntry.setAndMarkValid(val);
+
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ cacheCtx.addResult(retMap,
+ cacheKey,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+
+ if (readVer != null)
+ txEntry.entryReadVersion(readVer);
+ }
+
+ // Even though we bring the value back from lock acquisition,
+ // we still need to recheck primary node for consistent values
+ // in case of concurrent transactional locks.
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed exception in get postLock (will retry): " +
+ cached);
+
+ txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
+ }
+ }
+ }
+
+ if (!missed.isEmpty() && cacheCtx.isLocal()) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return checkMissed(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ retMap,
+ missed,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
+ needVer,
+ expiryPlc0);
+ }
+
+ return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
+ }
+ };
+
+ FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
+ @Override Map<K, V> finish(Map<K, V> loaded) {
+ retMap.putAll(loaded);
+
+ return retMap;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
+
+ return fut1.isDone() ?
+ new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
+ new GridEmbeddedFuture<>(finClos, fut1);
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return plc2.apply(false, e);
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return new GridEmbeddedFuture<>(
+ fut,
+ plc2,
+ finClos);
+ }
+ }
+ else {
+ assert optimistic() || readCommitted() || skipVals;
+
+ if (!missed.isEmpty()) {
+ if (!readCommitted())
+ for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
+ KeyCacheObject cacheKey = it.next();
+
+ K keyVal =
+ (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false));
+
+ if (retMap.containsKey(keyVal))
+ it.remove();
+ }
+
+ if (missed.isEmpty())
+ return new GridFinishedFuture<>(retMap);
+
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
+ return checkMissed(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
+ retMap,
+ missed,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
+ needVer,
+ expiryPlc);
+ }
+
+ return new GridFinishedFuture<>(retMap);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ setRollbackOnly();
+
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Key to enlist.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param map Return map.
+ * @param missed Map of missed keys.
+ * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
+ * @param deserializeBinary Deserialize binary flag.
+ * @param skipVals Skip values flag.
+ * @param keepCacheObjects Keep cache objects flag.
+ * @param skipStore Skip store flag.
+ * @throws IgniteCheckedException If failed.
+ * @return Enlisted keys.
+ */
+ @SuppressWarnings({"RedundantTypeArguments"})
+ private <K, V> Collection<KeyCacheObject> enlistRead(
+ final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
+ Collection<KeyCacheObject> keys,
+ @Nullable ExpiryPolicy expiryPlc,
+ Map<K, V> map,
+ Map<KeyCacheObject, GridCacheVersion> missed,
+ int keysCnt,
+ boolean deserializeBinary,
+ boolean skipVals,
+ boolean keepCacheObjects,
+ boolean skipStore,
+ final boolean needVer
+ ) throws IgniteCheckedException {
+ assert !F.isEmpty(keys);
+ assert keysCnt == keys.size();
+
+ cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ boolean single = keysCnt == 1;
+
+ Collection<KeyCacheObject> lockKeys = null;
+
+ AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
+
+ boolean needReadVer = (serializable() && optimistic()) || needVer;
+
+ // In this loop we cover only read-committed or optimistic transactions.
+ // Transactions that are pessimistic and not read-committed are covered
+ // outside of this loop.
+ for (KeyCacheObject key : keys) {
+ if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
+ addActiveCache(cacheCtx);
+
+ IgniteTxKey txKey = cacheCtx.txKey(key);
+
+ // Check write map (always check writes first).
+ IgniteTxEntry txEntry = entry(txKey);
+
+ // Either non-read-committed or there was a previous write.
+ if (txEntry != null) {
+ CacheObject val = txEntry.value();
+
+ if (txEntry.hasValue()) {
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ if (val != null) {
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ if (txEntry.op() != READ)
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+ else {
+ ver = txEntry.entryReadVersion();
+
+ if (ver == null && pessimistic()) {
+ while (true) {
+ try {
+ GridCacheEntryEx cached = txEntry.cached();
+
+ ver = cached.isNear() ?
+ ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ }
+ }
+ }
+
+ if (ver == null) {
+ assert optimistic() && repeatableRead() : this;
+
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+ }
+ }
+
+ assert ver != null;
+ }
+
+ cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
+ ver, 0, 0);
+ }
+ }
+ else {
+ assert txEntry.op() == TRANSFORM;
+
+ while (true) {
+ try {
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ Object transformClo =
+ (txEntry.op() == TRANSFORM &&
+ cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ F.first(txEntry.entryProcessors()) : null;
+
+ if (needVer) {
+ getRes = txEntry.cached().innerGetVersioned(
+ null,
+ this,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary(),
+ null);
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else {
+ val = txEntry.cached().innerGet(
+ null,
+ this,
+ /*swap*/true,
+ /*read-through*/false,
+ /*metrics*/true,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+ }
+
+ if (val != null) {
+ if (!readCommitted() && !skipVals)
+ txEntry.readValue(val);
+
+ if (!F.isEmpty(txEntry.entryProcessors()))
+ val = txEntry.applyEntryProcessors(val);
+
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+ }
+ else
+ missed.put(key, txEntry.cached().version());
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ }
+ }
+ }
+ }
+ // First time access within transaction.
+ else {
+ if (lockKeys == null && !skipVals)
+ lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
+
+ if (!single && !skipVals)
+ lockKeys.add(key);
+
+ while (true) {
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+
+ try {
+ GridCacheVersion ver = entry.version();
+
+ CacheObject val = null;
+ GridCacheVersion readVer = null;
+ EntryGetResult getRes = null;
+
+ if (!pessimistic() || readCommitted() && !skipVals) {
+ IgniteCacheExpiryPolicy accessPlc =
+ optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
+
+ if (needReadVer) {
+ getRes = primaryLocal(entry) ?
+ entry.innerGetVersioned(
+ null,
+ this,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc,
+ !deserializeBinary,
+ null) : null;
+
+ if (getRes != null) {
+ val = getRes.value();
+ readVer = getRes.version();
+ }
+ }
+ else {
+ val = entry.innerGet(
+ null,
+ this,
+ /*swap*/true,
+ /*read-through*/false,
+ /*metrics*/true,
+ /*event*/true,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc,
+ !deserializeBinary);
+ }
+
+ if (val != null) {
+ cacheCtx.addResult(map,
+ key,
+ val,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ getRes,
+ readVer,
+ 0,
+ 0,
+ needVer);
+ }
+ else
+ missed.put(key, ver);
+ }
+ else
+ // We must wait for the lock in pessimistic mode.
+ missed.put(key, ver);
+
+ if (!readCommitted() && !skipVals) {
+ txEntry = addEntry(READ,
+ val,
+ null,
+ null,
+ entry,
+ expiryPlc,
+ null,
+ true,
+ -1L,
+ -1L,
+ null,
+ skipStore,
+ !deserializeBinary);
+
+ // As optimization, mark as checked immediately
+ // for non-pessimistic if value is not null.
+ if (val != null && !pessimistic()) {
+ txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.entryReadVersion(readVer);
+ }
+ }
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+ }
+ finally {
+ if (entry != null && readCommitted()) {
+ if (cacheCtx.isNear()) {
+ if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
+ if (entry.markObsolete(xidVer))
+ cacheCtx.cache().removeEntry(entry);
+ }
+ }
+ else
+ entry.context().evicts().touch(entry, topVer);
+ }
+ }
+ }
+ }
+ }
+
+ return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to load.
+ * @param filter Filter.
+ * @param ret Return value.
+ * @param needReadVer Read version flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param readThrough Read through flag.
+ * @param retval Return value flag.
+ * @param expiryPlc Expiry policy.
+ * @return Load future.
+ */
+ private IgniteInternalFuture<Void> loadMissing(
+ final GridCacheContext cacheCtx,
+ final AffinityTopologyVersion topVer,
+ final Set<KeyCacheObject> keys,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ final boolean needReadVer,
+ final boolean singleRmv,
+ final boolean hasFilters,
+ final boolean readThrough,
+ final boolean retval,
+ final boolean keepBinary,
+ final ExpiryPolicy expiryPlc) {
+ GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
+ new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+ @Override public void apply(KeyCacheObject key,
+ @Nullable Object val,
+ @Nullable GridCacheVersion loadVer) {
+ if (log.isDebugEnabled())
+ log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+
+ IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
+
+ assert e != null;
+
+ if (needReadVer) {
+ assert loadVer != null;
+
+ e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+ }
+
+ if (singleRmv) {
+ assert !hasFilters && !retval;
+ assert val == null || Boolean.TRUE.equals(val) : val;
+
+ ret.set(cacheCtx, null, val != null, keepBinary);
+ }
+ else {
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+ if (e.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ e.readValue(cacheVal);
- return null;
- }
+ try {
+ ver = e.cached().version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : e;
- /** {@inheritDoc} */
- @Override protected IgniteInternalFuture<Boolean> addReader(
- long msgId,
- GridDhtCacheEntry cached,
- IgniteTxEntry entry,
- AffinityTopologyVersion topVer
- ) {
- // We are in near transaction, do not add local node as reader.
- return null;
- }
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
- /** {@inheritDoc} */
- @Override protected void sendFinishReply(@Nullable Throwable err) {
- // We are in near transaction, do not send finish reply to local node.
- }
+ ver = null;
+ }
- /** {@inheritDoc} */
- @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) {
- PREP_FUT_UPD.compareAndSet(this, fut, null);
- }
+ addInvokeResult(e, cacheVal, ret, ver);
+ }
+ else {
+ boolean success;
- /**
- * Marks transaction to check if commit on backup.
- */
- public void markForBackupCheck() {
- needCheckBackup = true;
+ if (hasFilters) {
+ success = isAll(e.context(), key, cacheVal, filter);
+
+ if (!success)
+ e.value(cacheVal, false, false);
+ }
+ else
+ success = true;
+
+ ret.set(cacheCtx, cacheVal, success, keepBinary);
+ }
+ }
+ }
+ };
+
+ return loadMissing(
+ cacheCtx,
+ topVer,
+ readThrough,
+ /*async*/true,
+ keys,
+ /*skipVals*/singleRmv,
+ needReadVer,
+ keepBinary,
+ expiryPlc,
+ c);
}
/**
- * @return If need to check tx commit on backup.
+ * @param cacheCtx Cache context.
+ * @param loadFut Missing keys load future.
+ * @param ret Future result.
+ * @param keepBinary Keep binary flag.
+ * @return Future.
*/
- public boolean onNeedCheckBackup() {
- Boolean check = needCheckBackup;
+ private IgniteInternalFuture optimisticPutFuture(
+ final GridCacheContext cacheCtx,
+ IgniteInternalFuture<Void> loadFut,
+ final GridCacheReturn ret,
+ final boolean keepBinary
+ ) {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
- if (check != null && check) {
- needCheckBackup = false;
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
- return true;
- }
<TRUNCATED>
[2/5] ignite git commit: ignite-4768 txs
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b1a4003..d457399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -18,33 +18,23 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.io.Externalizable;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import javax.cache.Cache;
-import javax.cache.CacheException;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -58,9 +48,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -68,36 +55,26 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.C2;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
@@ -105,8 +82,6 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
-import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
-import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
@@ -391,142 +366,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
return null;
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> loadMissing(
- final GridCacheContext cacheCtx,
- final AffinityTopologyVersion topVer,
- final boolean readThrough,
- boolean async,
- final Collection<KeyCacheObject> keys,
- boolean skipVals,
- boolean needVer,
- boolean keepBinary,
- final ExpiryPolicy expiryPlc,
- final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
- ) {
- assert cacheCtx.isLocal() : cacheCtx.name();
-
- if (!readThrough || !cacheCtx.readThrough()) {
- for (KeyCacheObject key : keys)
- c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
-
- return new GridFinishedFuture<>();
- }
-
- try {
- IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ?
- accessPolicy(cacheCtx, keys) :
- cacheCtx.cache().expiryPolicy(expiryPlc);
-
- Map<KeyCacheObject, GridCacheVersion> misses = null;
-
- for (KeyCacheObject key : keys) {
- while (true) {
- IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
- GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) :
- txEntry.cached();
-
- if (entry == null)
- continue;
-
- try {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- this,
- /*readSwap*/true,
- /*unmarshal*/true,
- /*update-metrics*/!skipVals,
- /*event*/!skipVals,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- expiryPlc0,
- txEntry == null ? keepBinary : txEntry.keepBinary(),
- null);
-
- if (res == null) {
- if (misses == null)
- misses = new LinkedHashMap<>();
-
- misses.put(key, entry.version());
- }
- else
- c.apply(key, skipVals ? true : res.value(), res.version());
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry, will retry: " + key);
-
- if (txEntry != null)
- txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion()));
- }
- }
- }
-
- if (misses != null) {
- final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
-
- cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() {
- @Override public void apply(KeyCacheObject key, Object val) {
- GridCacheVersion ver = misses0.remove(key);
-
- assert ver != null : key;
-
- if (val != null) {
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
- while (true) {
- GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
-
- try {
- EntryGetResult verVal = entry.versionedValue(cacheVal,
- ver,
- null,
- null,
- null);
-
- if (log.isDebugEnabled()) {
- log.debug("Set value loaded from store into entry [" +
- "oldVer=" + ver +
- ", newVer=" + verVal.version() +
- ", entry=" + entry + ']');
- }
-
- ver = verVal.version();
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry, (will retry): " + entry);
- }
- catch (IgniteCheckedException e) {
- // Wrap errors (will be unwrapped).
- throw new GridClosureException(e);
- }
- }
- }
- else
- ver = SER_READ_EMPTY_ENTRY_VER;
-
- c.apply(key, val, ver);
- }
- });
-
- for (KeyCacheObject key : misses0.keySet())
- c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
- }
-
- return new GridFinishedFuture<>();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
/**
* Gets minimum version present in transaction.
*
@@ -1103,2484 +942,226 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * @param entry Entry.
- * @return {@code True} if local node is current primary for given entry.
+ * @param ctx Cache context.
+ * @param key Key.
+ * @param expiryPlc Expiry policy.
+ * @return Expiry policy wrapper for entries accessed locally in optimistic transaction.
*/
- private boolean primaryLocal(GridCacheEntryEx entry) {
- return entry.context().affinity().primaryByPartition(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE);
+ protected IgniteCacheExpiryPolicy accessPolicy(
+ GridCacheContext ctx,
+ IgniteTxKey key,
+ @Nullable ExpiryPolicy expiryPlc
+ ) {
+ return null;
}
/**
* @param cacheCtx Cache context.
- * @param keys Key to enlist.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param map Return map.
- * @param missed Map of missed keys.
- * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
- * @param deserializeBinary Deserialize binary flag.
- * @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects flag.
- * @param skipStore Skip store flag.
- * @throws IgniteCheckedException If failed.
- * @return Enlisted keys.
+ * @param keys Keys.
+ * @return Expiry policy.
*/
- @SuppressWarnings({"RedundantTypeArguments"})
- private <K, V> Collection<KeyCacheObject> enlistRead(
- final GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<KeyCacheObject> keys,
- @Nullable ExpiryPolicy expiryPlc,
- Map<K, V> map,
- Map<KeyCacheObject, GridCacheVersion> missed,
- int keysCnt,
- boolean deserializeBinary,
- boolean skipVals,
- boolean keepCacheObjects,
- boolean skipStore,
- final boolean needVer
- ) throws IgniteCheckedException {
- assert !F.isEmpty(keys);
- assert keysCnt == keys.size();
-
- cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
-
- boolean single = keysCnt == 1;
-
- Collection<KeyCacheObject> lockKeys = null;
-
- AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
-
- boolean needReadVer = (serializable() && optimistic()) || needVer;
-
- // In this loop we cover only read-committed or optimistic transactions.
- // Transactions that are pessimistic and not read-committed are covered
- // outside of this loop.
- for (KeyCacheObject key : keys) {
- if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
- addActiveCache(cacheCtx);
-
- IgniteTxKey txKey = cacheCtx.txKey(key);
-
- // Check write map (always check writes first).
- IgniteTxEntry txEntry = entry(txKey);
-
- // Either non-read-committed or there was a previous write.
- if (txEntry != null) {
- CacheObject val = txEntry.value();
-
- if (txEntry.hasValue()) {
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- if (val != null) {
- GridCacheVersion ver = null;
-
- if (needVer) {
- if (txEntry.op() != READ)
- ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
- else {
- ver = txEntry.entryReadVersion();
+ protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
+ return null;
+ }
- if (ver == null && pessimistic()) {
- while (true) {
- try {
- GridCacheEntryEx cached = txEntry.cached();
+ /**
+ * Post lock processing for put or remove.
+ *
+ * @param cacheCtx Context.
+ * @param keys Keys.
+ * @param ret Return value.
+ * @param rmv {@code True} if remove.
+ * @param retval Flag to return value or not.
+ * @param read {@code True} if read.
+ * @param accessTtl TTL for read operation.
+ * @param filter Filter to check entries.
+ * @throws IgniteCheckedException If error.
+ * @param computeInvoke If {@code true} computes return value for invoke operation.
+ */
+ @SuppressWarnings("unchecked")
+ protected final void postLockWrite(
+ GridCacheContext cacheCtx,
+ Iterable<KeyCacheObject> keys,
+ GridCacheReturn ret,
+ boolean rmv,
+ boolean retval,
+ boolean read,
+ long accessTtl,
+ CacheEntryPredicate[] filter,
+ boolean computeInvoke
+ ) throws IgniteCheckedException {
+ for (KeyCacheObject k : keys) {
+ IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
- ver = cached.isNear() ?
- ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+ if (txEntry == null)
+ throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
+ "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
- }
- }
- }
+ while (true) {
+ GridCacheEntryEx cached = txEntry.cached();
- if (ver == null) {
- assert optimistic() && repeatableRead() : this;
+ try {
+ assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
+ "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
+ ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
- ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
- }
- }
+ if (log.isDebugEnabled())
+ log.debug("Post lock write entry: " + cached);
- assert ver != null;
- }
+ CacheObject v = txEntry.previousValue();
+ boolean hasPrevVal = txEntry.hasPreviousValue();
- cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
- ver, 0, 0);
- }
- }
- else {
- assert txEntry.op() == TRANSFORM;
+ if (onePhaseCommit())
+ filter = txEntry.filters();
- while (true) {
- try {
- GridCacheVersion readVer = null;
- EntryGetResult getRes = null;
+ // If we have user-passed filter, we must read value into entry for peek().
+ if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
+ retval = true;
- Object transformClo =
- (txEntry.op() == TRANSFORM &&
- cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
- F.first(txEntry.entryProcessors()) : null;
+ boolean invoke = txEntry.op() == TRANSFORM;
- if (needVer) {
- getRes = txEntry.cached().innerGetVersioned(
- null,
- this,
- /*swap*/true,
- /*unmarshal*/true,
- /*update-metrics*/true,
- /*event*/!skipVals,
- CU.subjectId(this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary(),
- null);
+ if (retval || invoke) {
+ if (!cacheCtx.isNear()) {
+ if (!hasPrevVal) {
+ // For non-local cache should read from store after lock on primary.
+ boolean readThrough = cacheCtx.isLocal() &&
+ (invoke || cacheCtx.loadPreviousValue()) &&
+ !txEntry.skipStore();
- if (getRes != null) {
- val = getRes.value();
- readVer = getRes.version();
- }
- }
- else {
- val = txEntry.cached().innerGet(
+ v = cached.innerGet(
null,
this,
/*swap*/true,
- /*read-through*/false,
- /*metrics*/true,
- /*event*/!skipVals,
+ readThrough,
+ /*metrics*/!invoke,
+ /*event*/!invoke && !dht(),
/*temporary*/false,
CU.subjectId(this, cctx),
- transformClo,
+ null,
resolveTaskName(),
null,
txEntry.keepBinary());
}
-
- if (val != null) {
- if (!readCommitted() && !skipVals)
- txEntry.readValue(val);
-
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- cacheCtx.addResult(map,
- key,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- getRes,
- readVer,
- 0,
- 0,
- needVer);
- }
- else
- missed.put(key, txEntry.cached().version());
-
- break;
}
- catch (GridCacheEntryRemovedException ignored) {
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ else {
+ if (!hasPrevVal)
+ v = cached.rawGetOrUnmarshal(false);
}
- }
- }
- }
- // First time access within transaction.
- else {
- if (lockKeys == null && !skipVals)
- lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
- if (!single && !skipVals)
- lockKeys.add(key);
+ if (txEntry.op() == TRANSFORM) {
+ if (computeInvoke) {
+ GridCacheVersion ver;
+
+ try {
+ ver = cached.version();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert optimistic() : txEntry;
- while (true) {
- GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
- try {
- GridCacheVersion ver = entry.version();
-
- CacheObject val = null;
- GridCacheVersion readVer = null;
- EntryGetResult getRes = null;
-
- if (!pessimistic() || readCommitted() && !skipVals) {
- IgniteCacheExpiryPolicy accessPlc =
- optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
-
- if (needReadVer) {
- getRes = primaryLocal(entry) ?
- entry.innerGetVersioned(
- null,
- this,
- /*swap*/true,
- /*unmarshal*/true,
- /*metrics*/true,
- /*event*/true,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- accessPlc,
- !deserializeBinary,
- null) : null;
-
- if (getRes != null) {
- val = getRes.value();
- readVer = getRes.version();
+ ver = null;
}
- }
- else {
- val = entry.innerGet(
- null,
- this,
- /*swap*/true,
- /*read-through*/false,
- /*metrics*/true,
- /*event*/true,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- accessPlc,
- !deserializeBinary);
- }
- if (val != null) {
- cacheCtx.addResult(map,
- key,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- getRes,
- readVer,
- 0,
- 0,
- needVer);
+ addInvokeResult(txEntry, v, ret, ver);
}
- else
- missed.put(key, ver);
}
else
- // We must wait for the lock in pessimistic mode.
- missed.put(key, ver);
-
- if (!readCommitted() && !skipVals) {
- txEntry = addEntry(READ,
- val,
- null,
- null,
- entry,
- expiryPlc,
- null,
- true,
- -1L,
- -1L,
- null,
- skipStore,
- !deserializeBinary);
-
- // As optimization, mark as checked immediately
- // for non-pessimistic if value is not null.
- if (val != null && !pessimistic()) {
- txEntry.markValid();
-
- if (needReadVer) {
- assert readVer != null;
-
- txEntry.entryReadVersion(readVer);
- }
- }
- }
-
- break; // While.
+ ret.value(cacheCtx, v, txEntry.keepBinary());
}
- catch (GridCacheEntryRemovedException ignored) {
+
+ boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
+
+ // For remove operation we return true only if we are removing s/t,
+ // i.e. cached value is not null.
+ ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
+
+ if (onePhaseCommit())
+ txEntry.filtersPassed(pass);
+
+ boolean updateTtl = read;
+
+ if (pass) {
+ txEntry.markValid();
+
if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+ log.debug("Filter passed in post lock for key: " + k);
}
- finally {
- if (entry != null && readCommitted()) {
- if (cacheCtx.isNear()) {
- if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
- if (entry.markObsolete(xidVer))
- cacheCtx.cache().removeEntry(entry);
- }
- }
- else
- entry.context().evicts().touch(entry, topVer);
+ else {
+ // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
+ txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
+ txEntry.filters(CU.empty0());
+ txEntry.filtersSet(false);
+
+ updateTtl = !cacheCtx.putIfAbsentFilter(filter);
+ }
+
+ if (updateTtl) {
+ if (!read) {
+ ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry);
+
+ if (expiryPlc != null)
+ txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
}
+ else
+ txEntry.ttl(accessTtl);
}
+
+ break; // While.
+ }
+ // If entry cached within transaction got removed before lock.
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
+
+ txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion()));
}
}
}
-
- return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
}
/**
- * @param ctx Cache context.
- * @param key Key.
- * @param expiryPlc Expiry policy.
- * @return Expiry policy wrapper for entries accessed locally in optimistic transaction.
+ * @param txEntry Entry.
+ * @param cacheVal Value.
+ * @param ret Return value to update.
+ * @param ver Entry version.
*/
- protected IgniteCacheExpiryPolicy accessPolicy(
- GridCacheContext ctx,
- IgniteTxKey key,
- @Nullable ExpiryPolicy expiryPlc
- ) {
- return null;
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param keys Keys.
- * @return Expiry policy.
- */
- protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
- return null;
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param topVer Topology version.
- * @param map Return map.
- * @param missedMap Missed keys.
- * @param deserializeBinary Deserialize binary flag.
- * @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects flag.
- * @param skipStore Skip store flag.
- * @param expiryPlc Expiry policy.
- * @return Loaded key-value pairs.
- */
- private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed(
- final GridCacheContext cacheCtx,
- final AffinityTopologyVersion topVer,
- final Map<K, V> map,
- final Map<KeyCacheObject, GridCacheVersion> missedMap,
- final boolean deserializeBinary,
- final boolean skipVals,
- final boolean keepCacheObjects,
- final boolean skipStore,
- final boolean needVer,
- final ExpiryPolicy expiryPlc
-
- ) {
- if (log.isDebugEnabled())
- log.debug("Loading missed values for missed map: " + missedMap);
-
- final boolean needReadVer = (serializable() && optimistic()) || needVer;
-
- return new GridEmbeddedFuture<>(
- new C2<Void, Exception, Map<K, V>>() {
- @Override public Map<K, V> apply(Void v, Exception e) {
- if (e != null) {
- setRollbackOnly();
-
- throw new GridClosureException(e);
- }
-
- return map;
- }
- },
- loadMissing(
- cacheCtx,
- topVer,
- !skipStore,
- false,
- missedMap.keySet(),
- skipVals,
- needReadVer,
- !deserializeBinary,
- expiryPlc,
- new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
- @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
- if (isRollbackOnly()) {
- if (log.isDebugEnabled())
- log.debug("Ignoring loaded value for read because transaction was rolled back: " +
- IgniteTxLocalAdapter.this);
-
- return;
- }
-
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
- CacheObject visibleVal = cacheVal;
-
- IgniteTxKey txKey = cacheCtx.txKey(key);
-
- IgniteTxEntry txEntry = entry(txKey);
-
- if (txEntry != null) {
- if (!readCommitted())
- txEntry.readValue(cacheVal);
-
- if (!F.isEmpty(txEntry.entryProcessors()))
- visibleVal = txEntry.applyEntryProcessors(visibleVal);
- }
-
- assert txEntry != null || readCommitted() || skipVals;
-
- GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached();
-
- if (readCommitted() || skipVals) {
- cacheCtx.evicts().touch(e, topologyVersion());
-
- if (visibleVal != null) {
- cacheCtx.addResult(map,
- key,
- visibleVal,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- needVer ? loadVer : null,
- 0,
- 0);
- }
- }
- else {
- assert txEntry != null;
-
- txEntry.setAndMarkValid(cacheVal);
-
- if (needReadVer) {
- assert loadVer != null;
-
- txEntry.entryReadVersion(loadVer);
- }
-
- if (visibleVal != null) {
- cacheCtx.addResult(map,
- key,
- visibleVal,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- needVer ? loadVer : null,
- 0,
- 0);
- }
- }
- }
- })
- );
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
- final GridCacheContext cacheCtx,
- @Nullable final AffinityTopologyVersion entryTopVer,
- Collection<KeyCacheObject> keys,
- final boolean deserializeBinary,
- final boolean skipVals,
- final boolean keepCacheObjects,
- final boolean skipStore,
- final boolean needVer) {
- if (F.isEmpty(keys))
- return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
-
- init();
-
- int keysCnt = keys.size();
-
- boolean single = keysCnt == 1;
-
- try {
- checkValid();
-
- final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
-
- final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
-
- CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
- ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
-
- final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
- entryTopVer,
- keys,
- expiryPlc,
- retMap,
- missed,
- keysCnt,
- deserializeBinary,
- skipVals,
- keepCacheObjects,
- skipStore,
- needVer);
-
- if (single && missed.isEmpty())
- return new GridFinishedFuture<>(retMap);
-
- // Handle locks.
- if (pessimistic() && !readCommitted() && !skipVals) {
- if (expiryPlc == null)
- expiryPlc = cacheCtx.expiry();
-
- long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
- long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED;
-
- long timeout = remainingTime();
-
- if (timeout == -1)
- return new GridFinishedFuture<>(timeoutException());
-
- IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
- timeout,
- this,
- true,
- true,
- isolation,
- isInvalidate(),
- createTtl,
- accessTtl);
-
- final ExpiryPolicy expiryPlc0 = expiryPlc;
-
- PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
- @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Acquired transaction lock for read on keys: " + lockKeys);
-
- // Load keys only after the locks have been acquired.
- for (KeyCacheObject cacheKey : lockKeys) {
- K keyVal = (K)
- (keepCacheObjects ? cacheKey :
- cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
-
- if (retMap.containsKey(keyVal))
- // We already have a return value.
- continue;
-
- IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
- IgniteTxEntry txEntry = entry(txKey);
-
- assert txEntry != null;
-
- // Check if there is cached value.
- while (true) {
- GridCacheEntryEx cached = txEntry.cached();
-
- CacheObject val = null;
- GridCacheVersion readVer = null;
- EntryGetResult getRes = null;
-
- try {
- Object transformClo =
- (!F.isEmpty(txEntry.entryProcessors()) &&
- cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
- F.first(txEntry.entryProcessors()) : null;
-
- if (needVer) {
- getRes = cached.innerGetVersioned(
- null,
- IgniteTxLocalAdapter.this,
- /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
- /*unmarshal*/true,
- /*update-metrics*/true,
- /*event*/!skipVals,
- CU.subjectId(IgniteTxLocalAdapter.this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary(),
- null);
-
- if (getRes != null) {
- val = getRes.value();
- readVer = getRes.version();
- }
- }
- else{
- val = cached.innerGet(
- null,
- IgniteTxLocalAdapter.this,
- cacheCtx.isSwapOrOffheapEnabled(),
- /*read-through*/false,
- /*metrics*/true,
- /*events*/!skipVals,
- /*temporary*/false,
- CU.subjectId(IgniteTxLocalAdapter.this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary());
- }
-
- // If value is in cache and passed the filter.
- if (val != null) {
- missed.remove(cacheKey);
-
- txEntry.setAndMarkValid(val);
-
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- cacheCtx.addResult(retMap,
- cacheKey,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- getRes,
- readVer,
- 0,
- 0,
- needVer);
-
- if (readVer != null)
- txEntry.entryReadVersion(readVer);
- }
-
- // Even though we bring the value back from lock acquisition,
- // we still need to recheck primary node for consistent values
- // in case of concurrent transactional locks.
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed exception in get postLock (will retry): " +
- cached);
-
- txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
- }
- }
- }
-
- if (!missed.isEmpty() && cacheCtx.isLocal()) {
- AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = entryTopVer;
-
- return checkMissed(cacheCtx,
- topVer != null ? topVer : topologyVersion(),
- retMap,
- missed,
- deserializeBinary,
- skipVals,
- keepCacheObjects,
- skipStore,
- needVer,
- expiryPlc0);
- }
-
- return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
- }
- };
-
- FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
- @Override Map<K, V> finish(Map<K, V> loaded) {
- retMap.putAll(loaded);
-
- return retMap;
- }
- };
-
- if (fut.isDone()) {
- try {
- IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
-
- return fut1.isDone() ?
- new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
- new GridEmbeddedFuture<>(finClos, fut1);
- }
- catch (GridClosureException e) {
- return new GridFinishedFuture<>(e.unwrap());
- }
- catch (IgniteCheckedException e) {
- try {
- return plc2.apply(false, e);
- }
- catch (Exception e1) {
- return new GridFinishedFuture<>(e1);
- }
- }
- }
- else {
- return new GridEmbeddedFuture<>(
- fut,
- plc2,
- finClos);
- }
- }
- else {
- assert optimistic() || readCommitted() || skipVals;
-
- if (!missed.isEmpty()) {
- if (!readCommitted())
- for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
- KeyCacheObject cacheKey = it.next();
-
- K keyVal =
- (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false));
-
- if (retMap.containsKey(keyVal))
- it.remove();
- }
-
- if (missed.isEmpty())
- return new GridFinishedFuture<>(retMap);
-
- AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = entryTopVer;
-
- return checkMissed(cacheCtx,
- topVer != null ? topVer : topologyVersion(),
- retMap,
- missed,
- deserializeBinary,
- skipVals,
- keepCacheObjects,
- skipStore,
- needVer,
- expiryPlc);
- }
-
- return new GridFinishedFuture<>(retMap);
- }
- }
- catch (IgniteCheckedException e) {
- setRollbackOnly();
-
- return new GridFinishedFuture<>(e);
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Map<? extends K, ? extends V> map,
- boolean retval
- ) {
- return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
- entryTopVer,
- map,
- null,
- null,
- null,
- retval);
- }
-
- /** {@inheritDoc} */
- @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- K key,
- V val,
- boolean retval,
- CacheEntryPredicate filter) {
- return putAsync0(cacheCtx,
- entryTopVer,
- key,
- val,
- null,
- null,
- retval,
- filter);
- }
-
- /** {@inheritDoc} */
- @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- K key,
- EntryProcessor<K, V, Object> entryProcessor,
- Object... invokeArgs) {
- return (IgniteInternalFuture)putAsync0(cacheCtx,
- entryTopVer,
- key,
- null,
- entryProcessor,
- invokeArgs,
- true,
- null);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> putAllDrAsync(
- GridCacheContext cacheCtx,
- Map<KeyCacheObject, GridCacheDrInfo> drMap
- ) {
- Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
- @Override public Object apply(GridCacheDrInfo val) {
- return val.value();
- }
- });
-
- return this.<Object, Object>putAllAsync0(cacheCtx,
- null,
- map,
- null,
- null,
- drMap,
- false);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
- Object... invokeArgs
- ) {
- return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
- entryTopVer,
- null,
- map,
- invokeArgs,
- null,
- true);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> removeAllDrAsync(
- GridCacheContext cacheCtx,
- Map<KeyCacheObject, GridCacheVersion> drMap
- ) {
- return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
- }
-
- /**
- * Checks filter for non-pessimistic transactions.
- *
- * @param cctx Cache context.
- * @param key Key.
- * @param val Value.
- * @param filter Filter to check.
- * @return {@code True} if passed or pessimistic.
- */
- private boolean filter(
- GridCacheContext cctx,
- KeyCacheObject key,
- CacheObject val,
- CacheEntryPredicate[] filter) {
- return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter);
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param cacheKey Key to enlist.
- * @param val Value.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param entryProcessor Entry processor (for invoke operation).
- * @param invokeArgs Optional arguments for EntryProcessor.
- * @param retval Flag indicating whether a value should be returned.
- * @param lockOnly If {@code true}, then entry will be enlisted as noop.
- * @param filter User filters.
- * @param ret Return value.
- * @param skipStore Skip store flag.
- * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
- * @return Future for entry values loading.
- */
- private <K, V> IgniteInternalFuture<Void> enlistWrite(
- final GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- KeyCacheObject cacheKey,
- Object val,
- @Nullable ExpiryPolicy expiryPlc,
- @Nullable EntryProcessor<K, V, Object> entryProcessor,
- @Nullable Object[] invokeArgs,
- final boolean retval,
- boolean lockOnly,
- final CacheEntryPredicate[] filter,
- final GridCacheReturn ret,
- boolean skipStore,
- final boolean singleRmv,
- boolean keepBinary,
- Byte dataCenterId) {
- try {
- addActiveCache(cacheCtx);
-
- final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
- final boolean needVal = singleRmv || retval || hasFilters;
- final boolean needReadVer = needVal && (serializable() && optimistic());
-
- if (entryProcessor != null)
- transform = true;
-
- GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
-
- boolean loadMissed = enlistWriteEntry(cacheCtx,
- entryTopVer,
- cacheKey,
- val,
- entryProcessor,
- invokeArgs,
- expiryPlc,
- retval,
- lockOnly,
- filter,
- /*drVer*/drVer,
- /*drTtl*/-1L,
- /*drExpireTime*/-1L,
- ret,
- /*enlisted*/null,
- skipStore,
- singleRmv,
- hasFilters,
- needVal,
- needReadVer,
- keepBinary);
-
- if (loadMissed) {
- AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = entryTopVer;
-
- return loadMissing(cacheCtx,
- topVer != null ? topVer : topologyVersion(),
- Collections.singleton(cacheKey),
- filter,
- ret,
- needReadVer,
- singleRmv,
- hasFilters,
- /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
- retval,
- keepBinary,
- expiryPlc);
- }
-
- return new GridFinishedFuture<>();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
- /**
- * Internal routine for <tt>putAll(..)</tt>
- *
- * @param cacheCtx Cache context.
- * @param keys Keys to enlist.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param lookup Value lookup map ({@code null} for remove).
- * @param invokeMap Map with entry processors for invoke operation.
- * @param invokeArgs Optional arguments for EntryProcessor.
- * @param retval Flag indicating whether a value should be returned.
- * @param lockOnly If {@code true}, then entry will be enlisted as noop.
- * @param filter User filters.
- * @param ret Return value.
- * @param enlisted Collection of keys enlisted into this transaction.
- * @param drPutMap DR put map (optional).
- * @param drRmvMap DR remove map (optional).
- * @param skipStore Skip store flag.
- * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
- * @param keepBinary Keep binary flag.
- * @param dataCenterId Optional data center ID.
- * @return Future for missing values loading.
- */
- private <K, V> IgniteInternalFuture<Void> enlistWrite(
- final GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<?> keys,
- @Nullable ExpiryPolicy expiryPlc,
- @Nullable Map<?, ?> lookup,
- @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
- @Nullable Object[] invokeArgs,
- final boolean retval,
- boolean lockOnly,
- final CacheEntryPredicate[] filter,
- final GridCacheReturn ret,
- Collection<KeyCacheObject> enlisted,
- @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
- @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
- boolean skipStore,
- final boolean singleRmv,
- final boolean keepBinary,
- Byte dataCenterId
- ) {
- assert retval || invokeMap == null;
-
- try {
- addActiveCache(cacheCtx);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
-
- boolean rmv = lookup == null && invokeMap == null;
-
- final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
- final boolean needVal = singleRmv || retval || hasFilters;
- final boolean needReadVer = needVal && (serializable() && optimistic());
-
- try {
- // Set transform flag for transaction.
- if (invokeMap != null)
- transform = true;
-
- Set<KeyCacheObject> missedForLoad = null;
-
- for (Object key : keys) {
- if (key == null) {
- rollback();
-
- throw new NullPointerException("Null key.");
- }
-
- Object val = rmv || lookup == null ? null : lookup.get(key);
- EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
-
- GridCacheVersion drVer;
- long drTtl;
- long drExpireTime;
-
- if (drPutMap != null) {
- GridCacheDrInfo info = drPutMap.get(key);
-
- assert info != null;
-
- drVer = info.version();
- drTtl = info.ttl();
- drExpireTime = info.expireTime();
- }
- else if (drRmvMap != null) {
- assert drRmvMap.get(key) != null;
-
- drVer = drRmvMap.get(key);
- drTtl = -1L;
- drExpireTime = -1L;
- }
- else if (dataCenterId != null) {
- drVer = cctx.versions().next(dataCenterId);
- drTtl = -1L;
- drExpireTime = -1L;
- }
- else {
- drVer = null;
- drTtl = -1L;
- drExpireTime = -1L;
- }
-
- if (!rmv && val == null && entryProcessor == null) {
- setRollbackOnly();
-
- throw new NullPointerException("Null value.");
- }
-
- KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
-
- boolean loadMissed = enlistWriteEntry(cacheCtx,
- entryTopVer,
- cacheKey,
- val,
- entryProcessor,
- invokeArgs,
- expiryPlc,
- retval,
- lockOnly,
- filter,
- drVer,
- drTtl,
- drExpireTime,
- ret,
- enlisted,
- skipStore,
- singleRmv,
- hasFilters,
- needVal,
- needReadVer,
- keepBinary);
-
- if (loadMissed) {
- if (missedForLoad == null)
- missedForLoad = new HashSet<>();
-
- missedForLoad.add(cacheKey);
- }
- }
-
- if (missedForLoad != null) {
- AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = entryTopVer;
-
- return loadMissing(cacheCtx,
- topVer != null ? topVer : topologyVersion(),
- missedForLoad,
- filter,
- ret,
- needReadVer,
- singleRmv,
- hasFilters,
- /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
- retval,
- keepBinary,
- expiryPlc);
- }
-
- return new GridFinishedFuture<>();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param keys Keys to load.
- * @param filter Filter.
- * @param ret Return value.
- * @param needReadVer Read version flag.
- * @param singleRmv {@code True} for single remove operation.
- * @param hasFilters {@code True} if filters not empty.
- * @param readThrough Read through flag.
- * @param retval Return value flag.
- * @param expiryPlc Expiry policy.
- * @return Load future.
- */
- private IgniteInternalFuture<Void> loadMissing(
- final GridCacheContext cacheCtx,
- final AffinityTopologyVersion topVer,
- final Set<KeyCacheObject> keys,
- final CacheEntryPredicate[] filter,
- final GridCacheReturn ret,
- final boolean needReadVer,
- final boolean singleRmv,
- final boolean hasFilters,
- final boolean readThrough,
- final boolean retval,
- final boolean keepBinary,
- final ExpiryPolicy expiryPlc) {
- GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
- new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
- @Override public void apply(KeyCacheObject key,
- @Nullable Object val,
- @Nullable GridCacheVersion loadVer) {
- if (log.isDebugEnabled())
- log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
-
- IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
-
- assert e != null;
-
- if (needReadVer) {
- assert loadVer != null;
-
- e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
- }
-
- if (singleRmv) {
- assert !hasFilters && !retval;
- assert val == null || Boolean.TRUE.equals(val) : val;
-
- ret.set(cacheCtx, null, val != null, keepBinary);
- }
- else {
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
- if (e.op() == TRANSFORM) {
- GridCacheVersion ver;
-
- e.readValue(cacheVal);
-
- try {
- ver = e.cached().version();
- }
- catch (GridCacheEntryRemovedException ex) {
- assert optimistic() : e;
-
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
-
- ver = null;
- }
-
- addInvokeResult(e, cacheVal, ret, ver);
- }
- else {
- boolean success;
-
- if (hasFilters) {
- success = isAll(e.context(), key, cacheVal, filter);
-
- if (!success)
- e.value(cacheVal, false, false);
- }
- else
- success = true;
-
- ret.set(cacheCtx, cacheVal, success, keepBinary);
- }
- }
- }
- };
-
- return loadMissing(
- cacheCtx,
- topVer,
- readThrough,
- /*async*/true,
- keys,
- /*skipVals*/singleRmv,
- needReadVer,
- keepBinary,
- expiryPlc,
- c);
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param cacheKey Key.
- * @param val Value.
- * @param entryProcessor Entry processor.
- * @param invokeArgs Optional arguments for EntryProcessor.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param retval Return value flag.
- * @param lockOnly Lock only flag.
- * @param filter Filter.
- * @param drVer DR version.
- * @param drTtl DR ttl.
- * @param drExpireTime DR expire time.
- * @param ret Return value.
- * @param enlisted Enlisted keys collection.
- * @param skipStore Skip store flag.
- * @param singleRmv {@code True} for single remove operation.
- * @param hasFilters {@code True} if filters not empty.
- * @param needVal {@code True} if value is needed.
- * @param needReadVer {@code True} if need read entry version.
- * @return {@code True} if entry value should be loaded.
- * @throws IgniteCheckedException If failed.
- */
- private boolean enlistWriteEntry(GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- final KeyCacheObject cacheKey,
- @Nullable final Object val,
- @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
- @Nullable final Object[] invokeArgs,
- @Nullable final ExpiryPolicy expiryPlc,
- final boolean retval,
- final boolean lockOnly,
- final CacheEntryPredicate[] filter,
- final GridCacheVersion drVer,
- final long drTtl,
- long drExpireTime,
- final GridCacheReturn ret,
- @Nullable final Collection<KeyCacheObject> enlisted,
- boolean skipStore,
- boolean singleRmv,
- boolean hasFilters,
- final boolean needVal,
- boolean needReadVer,
- boolean keepBinary
- ) throws IgniteCheckedException {
- boolean loadMissed = false;
-
- final boolean rmv = val == null && entryProcessor == null;
-
- IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
- IgniteTxEntry txEntry = entry(txKey);
-
- // First time access.
- if (txEntry == null) {
- while (true) {
- GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
-
- try {
- entry.unswap(false);
-
- // Check if lock is being explicitly acquired by the same thread.
- if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
- entry.lockedByThread(threadId, xidVer)) {
- throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
- "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
- ", entry=" + entry +
- ", xidVer=" + xidVer +
- ", threadId=" + threadId +
- ", locNodeId=" + cctx.localNodeId() + ']');
- }
-
- CacheObject old = null;
- GridCacheVersion readVer = null;
-
- if (optimistic() && !implicit()) {
- try {
- if (needReadVer) {
- EntryGetResult res = primaryLocal(entry) ?
- entry.innerGetVersioned(
- null,
- this,
- /*swap*/false,
- /*unmarshal*/retval || needVal,
- /*metrics*/retval,
- /*events*/retval,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null,
- keepBinary,
- null) : null;
-
- if (res != null) {
- old = res.value();
- readVer = res.version();
- }
- }
- else {
- old = entry.innerGet(
- null,
- this,
- /*swap*/false,
- /*read-through*/false,
- /*metrics*/retval,
- /*events*/retval,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null,
- keepBinary);
- }
- }
- catch (ClusterTopologyCheckedException e) {
- entry.context().evicts().touch(entry, topologyVersion());
-
- throw e;
- }
- }
- else
- old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
-
- final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
- entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
-
- if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
- ret.set(cacheCtx, old, false, keepBinary);
-
- if (!readCommitted()) {
- if (optimistic() && serializable()) {
- txEntry = addEntry(op,
- old,
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore,
- keepBinary);
- }
- else {
- txEntry = addEntry(READ,
- old,
- null,
- null,
- entry,
- null,
- CU.empty0(),
- false,
- -1L,
- -1L,
- null,
- skipStore,
- keepBinary);
- }
-
- txEntry.markValid();
-
- if (needReadVer) {
- assert readVer != null;
-
- txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
- }
- }
-
- if (readCommitted())
- cacheCtx.evicts().touch(entry, topologyVersion());
-
- break; // While.
- }
-
- txEntry = addEntry(op,
- cacheCtx.toCacheObject(val),
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore,
- keepBinary);
-
- if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
- cacheCtx.evicts().touch(entry, topologyVersion());
-
- if (enlisted != null)
- enlisted.add(cacheKey);
-
- if (!pessimistic() && !implicit()) {
- txEntry.markValid();
-
- if (old == null) {
- if (needVal)
- loadMissed = true;
- else {
- assert !implicit() || !transform : this;
- assert txEntry.op() != TRANSFORM : txEntry;
-
- if (retval)
- ret.set(cacheCtx, null, true, keepBinary);
- else
- ret.success(true);
- }
- }
- else {
- if (needReadVer) {
- assert readVer != null;
-
- txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
- }
-
- if (retval && !transform)
- ret.set(cacheCtx, old, true, keepBinary);
- else {
- if (txEntry.op() == TRANSFORM) {
- GridCacheVersion ver;
-
- try {
- ver = entry.version();
- }
- catch (GridCacheEntryRemovedException ex) {
- assert optimistic() : txEntry;
-
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version " +
- "[err=" + ex.getMessage() + ']');
-
- ver = null;
- }
-
- addInvokeResult(txEntry, old, ret, ver);
- }
- else
- ret.success(true);
- }
- }
- }
- // Pessimistic.
- else {
- if (retval && !transform)
- ret.set(cacheCtx, old, true, keepBinary);
- else
- ret.success(true);
- }
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction putAll0 method: " + entry);
- }
- }
- }
- else {
- if (entryProcessor == null && txEntry.op() == TRANSFORM)
- throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
- "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
-
- GridCacheEntryEx entry = txEntry.cached();
-
- CacheObject v = txEntry.value();
-
- boolean del = txEntry.op() == DELETE && rmv;
-
- if (!del) {
- if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
- ret.set(cacheCtx, v, false, keepBinary);
-
- return loadMissed;
- }
-
- GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
- v != null ? UPDATE : CREATE;
-
- txEntry = addEntry(op,
- cacheCtx.toCacheObject(val),
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore,
- keepBinary);
-
- if (enlisted != null)
- enlisted.add(cacheKey);
-
- if (txEntry.op() == TRANSFORM) {
- GridCacheVersion ver;
-
- try {
- ver = entry.version();
- }
- catch (GridCacheEntryRemovedException e) {
- assert optimistic() : txEntry;
-
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
-
- ver = null;
- }
-
- addInvokeResult(txEntry, txEntry.value(), ret, ver);
- }
- }
-
- if (!pessimistic()) {
- txEntry.markValid();
-
- if (retval && !transform)
- ret.set(cacheCtx, v, true, keepBinary);
- else
- ret.success(true);
- }
- }
-
- return loadMissed;
- }
-
- /**
- * @param cctx Cache context.
- * @param key Key.
- * @param val Value.
- * @param filter Filter.
- * @return {@code True} if filter passed.
- */
- private boolean isAll(GridCacheContext cctx,
- KeyCacheObject key,
- CacheObject val,
- CacheEntryPredicate[] filter) {
- GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
- @Nullable @Override public CacheObject peekVisibleValue() {
- return rawGet();
- }
- };
-
- for (CacheEntryPredicate p0 : filter) {
- if (p0 != null && !p0.apply(e))
- return false;
- }
-
- return true;
- }
-
- /**
- * Post lock processing for put or remove.
- *
- * @param cacheCtx Context.
- * @param keys Keys.
- * @param ret Return value.
- * @param rmv {@code True} if remove.
- * @param retval Flag to return value or not.
- * @param read {@code True} if read.
- * @param accessTtl TTL for read operation.
- * @param filter Filter to check entries.
- * @throws IgniteCheckedException If error.
- * @param computeInvoke If {@code true} computes return value for invoke operation.
- */
- @SuppressWarnings("unchecked")
- protected final void postLockWrite(
- GridCacheContext cacheCtx,
- Iterable<KeyCacheObject> keys,
- GridCacheReturn ret,
- boolean rmv,
- boolean retval,
- boolean read,
- long accessTtl,
- CacheEntryPredicate[] filter,
- boolean computeInvoke
- ) throws IgniteCheckedException {
- for (KeyCacheObject k : keys) {
- IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
-
- if (txEntry == null)
- throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
- "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
-
- while (true) {
- GridCacheEntryEx cached = txEntry.cached();
-
- try {
- assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
- "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
- ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
-
- if (log.isDebugEnabled())
- log.debug("Post lock write entry: " + cached);
-
- CacheObject v = txEntry.previousValue();
- boolean hasPrevVal = txEntry.hasPreviousValue();
-
- if (onePhaseCommit())
- filter = txEntry.filters();
-
- // If we have user-passed filter, we must read value into entry for peek().
- if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
- retval = true;
-
- boolean invoke = txEntry.op() == TRANSFORM;
-
- if (retval || invoke) {
- if (!cacheCtx.isNear()) {
- if (!hasPrevVal) {
- // For non-local cache should read from store after lock on primary.
- boolean readThrough = cacheCtx.isLocal() &&
- (invoke || cacheCtx.loadPreviousValue()) &&
- !txEntry.skipStore();
-
- v = cached.innerGet(
- null,
- this,
- /*swap*/true,
- readThrough,
- /*metrics*/!invoke,
- /*event*/!invoke && !dht(),
- /*temporary*/false,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- null,
- txEntry.keepBinary());
- }
- }
- else {
- if (!hasPrevVal)
- v = cached.rawGetOrUnmarshal(false);
- }
-
- if (txEntry.op() == TRANSFORM) {
- if (computeInvoke) {
- GridCacheVersion ver;
-
- try {
- ver = cached.version();
- }
- catch (GridCacheEntryRemovedException e) {
- assert optimistic() : txEntry;
-
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
-
- ver = null;
- }
-
- addInvokeResult(txEntry, v, ret, ver);
- }
- }
- else
- ret.value(cacheCtx, v, txEntry.keepBinary());
- }
-
- boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
-
- // For remove operation we return true only if we are removing s/t,
- // i.e. cached value is not null.
- ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
-
- if (onePhaseCommit())
- txEntry.filtersPassed(pass);
-
- boolean updateTtl = read;
-
- if (pass) {
- txEntry.markValid();
-
- if (log.isDebugEnabled())
- log.debug("Filter passed in post lock for key: " + k);
- }
- else {
- // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
- txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
- txEntry.filters(CU.empty0());
- txEntry.filtersSet(false);
-
- updateTtl = !cacheCtx.putIfAbsentFilter(filter);
- }
-
- if (updateTtl) {
- if (!read) {
- ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry);
-
- if (expiryPlc != null)
- txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
- }
- else
- txEntry.ttl(accessTtl);
- }
-
- break; // While.
- }
- // If entry cached within transaction got removed before lock.
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
-
- txEntry.cached(entryEx(cached.context(), txEntry.txK
<TRUNCATED>
[3/5] ignite git commit: ignite-4768 txs
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index d448446..7a69a6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -27,10 +27,12 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.UUID;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
@@ -61,11 +63,17 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -904,7 +912,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private static class SessionData {
/** */
@GridToStringExclude
- private final IgniteInternalTx tx;
+ private final TxProxy tx;
/** */
private String cacheName;
@@ -914,7 +922,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private Map<Object, Object> props;
/** */
- private Object attachment;
+ private Object attach;
/** */
private final Set<CacheStoreManager> started =
@@ -927,8 +935,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
* @param tx Current transaction.
* @param cacheName Cache name.
*/
- private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) {
- this.tx = tx;
+ private SessionData(@Nullable final IgniteInternalTx tx, @Nullable String cacheName) {
+ this.tx = tx != null ? new TxProxy(tx) : null;
this.cacheName = cacheName;
}
@@ -936,7 +944,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
* @return Transaction.
*/
@Nullable private Transaction transaction() {
- return tx != null ? tx.proxy() : null;
+ return tx;
}
/**
@@ -950,12 +958,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/**
- * @param attachment Attachment.
+ * @param attach Attachment.
*/
- private Object attach(Object attachment) {
- Object prev = this.attachment;
+ private Object attach(Object attach) {
+ Object prev = this.attach;
- this.attachment = attachment;
+ this.attach = attach;
return prev;
}
@@ -964,7 +972,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
* @return Attachment.
*/
private Object attachment() {
- return attachment;
+ return attach;
}
/**
@@ -998,7 +1006,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(SessionData.class, this, "tx", CU.txString(tx));
+ return S.toString(SessionData.class, this, "tx", CU.txString(tx != null ? tx.tx : null));
}
}
@@ -1298,4 +1306,116 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
}
}
+
+ /**
+ *
+ */
+ private static class TxProxy implements Transaction {
+ /** */
+ private final IgniteInternalTx tx;
+
+ /**
+ * @param tx Transaction.
+ */
+ TxProxy(IgniteInternalTx tx) {
+ assert tx != null;
+
+ this.tx = tx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid xid() {
+ return tx.xid();
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID nodeId() {
+ return tx.nodeId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long threadId() {
+ return tx.threadId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return tx.startTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public TransactionIsolation isolation() {
+ return tx.isolation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public TransactionConcurrency concurrency() {
+ return tx.concurrency();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean implicit() {
+ return tx.implicit();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInvalidate() {
+ return tx.isInvalidate();
+ }
+
+ /** {@inheritDoc} */
+ @Override public TransactionState state() {
+ return tx.state();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout() {
+ return tx.timeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout(long timeout) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean setRollbackOnly() {
+ return tx.setRollbackOnly();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRollbackOnly() {
+ return tx.isRollbackOnly();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollback() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteAsyncSupport withAsync() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> future() {
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index dd900fe..e3adfc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -57,9 +57,6 @@ public interface IgniteInternalTx extends AutoCloseable {
/** Transaction is being finalized by user. */
USER_FINISH,
- /** Recovery request is received, user finish requests should be ignored. */
- RECOVERY_WAIT,
-
/** Transaction is being finalized by recovery procedure. */
RECOVERY_FINISH
}
@@ -689,11 +686,6 @@ public interface IgniteInternalTx extends AutoCloseable {
public boolean hasTransforms();
/**
- * @return Public API proxy.
- */
- public TransactionProxy proxy();
-
- /**
* @param topVer New topology version.
*/
public void onRemap(AffinityTopologyVersion topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index 7c7b5a8..ddafbac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.IgniteTransactionsEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.transactions.Transaction;
@@ -91,7 +92,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
}
/** {@inheritDoc} */
- @Override public IgniteInternalTx txStartEx(
+ @Override public GridNearTxLocal txStartEx(
GridCacheContext ctx,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
@@ -113,7 +114,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
}
/** {@inheritDoc} */
- @Override public IgniteInternalTx txStartEx(
+ @Override public GridNearTxLocal txStartEx(
GridCacheContext ctx,
TransactionConcurrency concurrency,
TransactionIsolation isolation)
@@ -141,7 +142,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
* @return Transaction.
*/
@SuppressWarnings("unchecked")
- private IgniteInternalTx txStart0(
+ private GridNearTxLocal txStart0(
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -151,11 +152,12 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
cctx.kernalContext().gateway().readLock();
try {
- IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx);
+ GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx);
if (tx != null)
throw new IllegalStateException("Failed to start new transaction " +
"(current thread already has a transaction): " + tx);
+
tx = cctx.tm().newTx(
false,
false,
@@ -178,7 +180,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
/** {@inheritDoc} */
@Nullable @Override public Transaction tx() {
- IgniteInternalTx tx = cctx.tm().userTx();
+ GridNearTxLocal tx = cctx.tm().userTx();
return tx != null ? tx.proxy() : null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b07a117..98f1140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -251,10 +251,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/** Store used flag. */
protected boolean storeEnabled = true;
- /** */
- @GridToStringExclude
- private TransactionProxyImpl proxy;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -546,15 +542,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
break;
- case RECOVERY_WAIT:
- FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT);
-
- FinalizationStatus cur = finalizing;
-
- res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH;
-
- break;
-
case RECOVERY_FINISH:
FinalizationStatus old = finalizing;
@@ -564,7 +551,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
default:
throw new IllegalArgumentException("Cannot set finalization status: " + status);
-
}
if (res) {
@@ -1257,7 +1243,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
* @throws IgniteCheckedException If batch update failed.
*/
@SuppressWarnings({"CatchGenericClass"})
- protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
+ protected final void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
if (!storeEnabled() || internal() ||
(!local() && near())) // No need to work with local store at GridNearTxRemote.
return;
@@ -1806,14 +1792,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public TransactionProxy proxy() {
- if (proxy == null)
- proxy = new TransactionProxyImpl(this, cctx, false);
-
- return proxy;
- }
-
- /** {@inheritDoc} */
@Override public boolean equals(Object o) {
return o == this || (o instanceof IgniteTxAdapter && xidVer.equals(((IgniteTxAdapter)o).xidVer));
}
@@ -2398,11 +2376,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public TransactionProxy proxy() {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public boolean equals(Object o) {
return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 14a7ed0..e1d12af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -423,7 +423,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/**
* @param val Value to set.
*/
- void setAndMarkValid(CacheObject val) {
+ public void setAndMarkValid(CacheObject val) {
setAndMarkValid(op(), val, this.val.hasWriteValue(), this.val.hasReadValue());
}
@@ -451,7 +451,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
* Marks this entry as value-has-bean-read. Effectively, makes values enlisted to transaction visible
* to further peek operations.
*/
- void markValid() {
+ public void markValid() {
prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 56a7fa2..331ca31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1551,7 +1551,7 @@ public class IgniteTxHandler {
// Prepare prior to reordering, so the pending locks added
// in prepare phase will get properly ordered as well.
- tx.prepare();
+ tx.prepareRemoteTx();
if (req.last()) {
assert !F.isEmpty(req.transactionNodes()) :
@@ -1644,7 +1644,7 @@ public class IgniteTxHandler {
// Prepare prior to reordering, so the pending locks added
// in prepare phase will get properly ordered as well.
- tx.prepare();
+ tx.prepareRemoteTx();
if (req.last())
tx.state(PREPARED);
[5/5] ignite git commit: ignite-4768 txs
Posted by sb...@apache.org.
ignite-4768 txs
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5523eac7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5523eac7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5523eac7
Branch: refs/heads/ignite-4768-1
Commit: 5523eac7d099da356732d12c3353e75c5dde044f
Parents: 901be4f
Author: sboikov <sb...@gridgain.com>
Authored: Tue Mar 14 16:39:11 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Mar 14 18:15:38 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/IgniteTransactionsEx.java | 7 +-
.../processors/cache/GridCacheAdapter.java | 85 +-
.../processors/cache/GridCacheProxyImpl.java | 3 +-
.../cache/GridCacheSharedContext.java | 3 +-
.../processors/cache/IgniteInternalCache.java | 3 +-
.../GridDistributedCacheAdapter.java | 2 +-
.../GridDistributedTxRemoteAdapter.java | 22 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 10 +-
.../dht/colocated/GridDhtColocatedCache.java | 8 +-
.../near/GridNearTransactionalCache.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 2698 ++++++++++++++++-
.../store/GridCacheStoreManagerAdapter.java | 142 +-
.../cache/transactions/IgniteInternalTx.java | 8 -
.../transactions/IgniteTransactionsImpl.java | 12 +-
.../cache/transactions/IgniteTxAdapter.java | 29 +-
.../cache/transactions/IgniteTxEntry.java | 4 +-
.../cache/transactions/IgniteTxHandler.java | 4 +-
.../transactions/IgniteTxLocalAdapter.java | 2777 +-----------------
.../cache/transactions/IgniteTxLocalEx.java | 136 -
.../cache/transactions/IgniteTxManager.java | 142 +-
.../transactions/TransactionProxyImpl.java | 13 +-
.../HibernateReadWriteAccessStrategy.java | 11 +-
.../processors/cache/jta/CacheJtaManager.java | 3 +-
.../processors/cache/jta/CacheJtaResource.java | 7 +-
24 files changed, 3087 insertions(+), 3046 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
index 9772dcc..14791ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -35,7 +36,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions {
* @param txSize Number of entries participating in transaction (may be approximate).
* @return New transaction.
*/
- public IgniteInternalTx txStartEx(GridCacheContext ctx,
+ public GridNearTxLocal txStartEx(GridCacheContext ctx,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -47,5 +48,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions {
* @param isolation Isolation.
* @return New transaction.
*/
- public IgniteInternalTx txStartEx(GridCacheContext ctx, TransactionConcurrency concurrency, TransactionIsolation isolation);
+ public GridNearTxLocal txStartEx(GridCacheContext ctx,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 71be718..c98cd24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
@@ -1876,7 +1877,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
- IgniteTxLocalAdapter tx = null;
+ GridNearTxLocal tx = null;
if (checkTx) {
try {
@@ -2132,7 +2133,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
else {
return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
- @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Map<K1, V1>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.getAllAsync(ctx,
readyTopVer,
keys,
@@ -2187,7 +2188,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected V getAndPut0(final K key, final V val, @Nullable final CacheEntryPredicate filter)
throws IgniteCheckedException {
return syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException {
return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value();
}
@@ -2237,7 +2238,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Nullable final CacheEntryPredicate filter)
{
return asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAsync(ctx, readyTopVer, key, val, true, filter)
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
}
@@ -2293,7 +2294,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected boolean put0(final K key, final V val, final CacheEntryPredicate filter)
throws IgniteCheckedException {
Boolean res = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException {
return tx.putAsync(ctx, null, key, val, false, filter).get().success();
}
@@ -2316,7 +2317,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(drMap.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
tx.putAllDrAsync(ctx, drMap).get();
}
@@ -2335,7 +2336,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncOp(drMap.keySet()) {
- @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAllDrAsync(ctx, drMap);
}
@@ -2380,7 +2381,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
- @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
throws IgniteCheckedException {
assert topVer == null || tx.implicit();
@@ -2418,7 +2419,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
@@ -2448,7 +2449,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() {
- @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
@@ -2491,7 +2492,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) {
- @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx,
+ @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx,
AffinityTopologyVersion readyTopVer) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
@Override public EntryProcessor apply(K k) {
@@ -2532,7 +2533,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(map.keySet());
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) {
- @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.invokeAsync(ctx,
readyTopVer,
(Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map,
@@ -2568,7 +2569,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(map.keySet());
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
throws IgniteCheckedException {
IgniteInternalFuture<GridCacheReturn> fut =
tx.invokeAsync(ctx, null, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
@@ -2616,7 +2617,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
@Nullable final CacheEntryPredicate filter) {
return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAsync(ctx,
readyTopVer,
key,
@@ -2721,7 +2722,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected void putAll0(final Map<? extends K, ? extends V> m) throws IgniteCheckedException {
syncOp(new SyncInOp(m.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
tx.putAllAsync(ctx, null, m, false).get();
}
@@ -2748,7 +2749,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected IgniteInternalFuture<?> putAllAsync0(final Map<? extends K, ? extends V> m) {
return asyncOp(new AsyncOp(m.keySet()) {
- @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAllAsync(ctx,
readyTopVer,
m,
@@ -2789,7 +2790,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean keepBinary = ctx.keepBinary();
return syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException {
K key0 = keepBinary ? (K) ctx.toCacheKeyObject(key) : key;
V ret = tx.removeAllAsync(ctx,
@@ -2839,7 +2840,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected IgniteInternalFuture<V> getAndRemoveAsync0(final K key) {
return asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
// TODO should we invoke interceptor here?
return tx.removeAllAsync(ctx,
readyTopVer,
@@ -2897,7 +2898,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected void removeAll0(final Collection<? extends K> keys) throws IgniteCheckedException {
syncOp(new SyncInOp(keys.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
tx.removeAllAsync(ctx,
null,
keys,
@@ -2938,7 +2939,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected IgniteInternalFuture<Object> removeAllAsync0(final Collection<? extends K> keys) {
return asyncOp(new AsyncOp(keys) {
- @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.removeAllAsync(ctx,
readyTopVer,
keys,
@@ -2990,7 +2991,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected boolean remove0(final K key, final CacheEntryPredicate filter) throws IgniteCheckedException {
Boolean res = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException {
return tx.removeAllAsync(ctx,
null,
Collections.singletonList(key),
@@ -3046,7 +3047,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable final CacheEntryPredicate filter) {
return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.removeAllAsync(ctx,
readyTopVer,
Collections.singletonList(key),
@@ -3071,8 +3072,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(false) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- tx.removeAllDrAsync(ctx, (Map)drMap).get();
+ @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
+ tx.removeAllDrAsync(ctx, drMap).get();
}
@Override public String toString() {
@@ -3090,8 +3091,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncOp(drMap.keySet()) {
- @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
- return tx.removeAllDrAsync(ctx, (Map)drMap);
+ @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
+ return tx.removeAllDrAsync(ctx, drMap);
}
@Override public String toString() {
@@ -3160,10 +3161,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Nullable @Override public Transaction tx() {
- IgniteTxAdapter tx = ctx.tm().threadLocalTx(ctx);
-
- return tx == null ? null : new TransactionProxyImpl<>(tx, ctx.shared(), false);
+ @Nullable @Override public GridNearTxLocal tx() {
+ return ctx.tm().threadLocalTx(ctx);
}
/** {@inheritDoc} */
@@ -4142,7 +4141,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Transaction commit future.
*/
@SuppressWarnings("unchecked")
- public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final IgniteInternalTx tx) {
+ IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final GridNearTxLocal tx) {
FutureHolder holder = lastFut.get();
holder.lock();
@@ -4208,7 +4207,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
awaitLastFut();
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
if (tx == null || tx.implicit()) {
TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config());
@@ -4304,7 +4303,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (log.isDebugEnabled())
log.debug("Performing async op: " + op);
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -4348,7 +4347,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
@SuppressWarnings("unchecked")
protected <T> IgniteInternalFuture<T> asyncOp(
- IgniteTxLocalAdapter tx,
+ GridNearTxLocal tx,
final AsyncOp<T> op,
final CacheOperationContext opCtx
) {
@@ -4364,7 +4363,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
try {
IgniteInternalFuture fut = holder.future();
- final IgniteTxLocalAdapter tx0 = tx;
+ final GridNearTxLocal tx0 = tx;
if (fut != null && !fut.isDone()) {
IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
@@ -4925,7 +4924,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private int retries;
/** */
- private IgniteTxLocalAdapter tx;
+ private GridNearTxLocal tx;
/** */
private CacheOperationContext opCtx;
@@ -5173,7 +5172,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Operation return value.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public abstract T op(IgniteTxLocalAdapter tx) throws IgniteCheckedException;
+ @Nullable public abstract T op(GridNearTxLocal tx) throws IgniteCheckedException;
}
/**
@@ -5188,7 +5187,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Nullable @Override public final Object op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Nullable @Override public final Object op(GridNearTxLocal tx) throws IgniteCheckedException {
inOp(tx);
return null;
@@ -5198,7 +5197,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param tx Transaction.
* @throws IgniteCheckedException If failed.
*/
- public abstract void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException;
+ public abstract void inOp(GridNearTxLocal tx) throws IgniteCheckedException;
}
/**
@@ -5234,14 +5233,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param readyTopVer Ready topology version.
* @return Operation future.
*/
- public abstract IgniteInternalFuture<T> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer);
+ public abstract IgniteInternalFuture<T> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer);
/**
* @param tx Transaction.
* @param opCtx Operation context.
* @return Operation future.
*/
- public IgniteInternalFuture<T> op(final IgniteTxLocalAdapter tx, CacheOperationContext opCtx) {
+ public IgniteInternalFuture<T> op(final GridNearTxLocal tx, CacheOperationContext opCtx) {
AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
if (txTopVer != null)
@@ -5267,7 +5266,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
private IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut,
final AffinityTopologyVersion topVer,
- final IgniteTxLocalAdapter tx,
+ final GridNearTxLocal tx,
final CacheOperationContext opCtx) {
final GridFutureAdapter fut0 = new GridFutureAdapter();
@@ -5304,7 +5303,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param opCtx Operation context.
* @return Future.
*/
- private IgniteInternalFuture<T> runOp(IgniteTxLocalAdapter tx,
+ private IgniteInternalFuture<T> runOp(GridNearTxLocal tx,
AffinityTopologyVersion topVer,
CacheOperationContext opCtx) {
ctx.operationContextPerCall(opCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 00898ec..731d23b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -977,7 +978,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
- @Override public Transaction tx() {
+ @Override public GridNearTxLocal tx() {
CacheOperationContext prev = gate.enter(opCtx);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 0f79100..e4f8fec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -742,7 +743,7 @@ public class GridCacheSharedContext<K, V> {
* @return Commit future.
*/
@SuppressWarnings("unchecked")
- public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(IgniteInternalTx tx) {
+ public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(GridNearTxLocal tx) {
GridCacheContext ctx = tx.txState().singleCacheContext(this);
if (ctx == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 0ac98fb..02b6461 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -976,7 +977,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
* @return Transaction started by this thread or {@code null} if this thread
* does not have a transaction.
*/
- @Nullable public Transaction tx();
+ @Nullable public GridNearTxLocal tx();
/**
* Evicts entry associated with given key from cache. Note, that entry will be evicted
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 7e4deff..00bc6d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -111,7 +111,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout) {
- IgniteTxLocalEx tx = ctx.tm().userTxx();
+ IgniteTxLocalEx tx = ctx.tm().userTx();
// Return value flag is true because we choose to bring values for explicit locks.
return lockAllAsync(ctx.cacheKeysView(keys),
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 68c0e57..e60d3dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -83,7 +83,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
* Transaction created by system implicitly on remote nodes.
*/
-public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
+public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
implements IgniteTxRemoteEx {
/** */
private static final long serialVersionUID = 0L;
@@ -180,11 +180,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public Collection<UUID> masterNodeIds() {
- return Collections.singleton(nodeId);
- }
-
- /** {@inheritDoc} */
@Override public UUID originatingNodeId() {
return nodeId;
}
@@ -378,11 +373,9 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/**
- * Prepare phase.
- *
- * @throws IgniteCheckedException If prepare failed.
+ * @throws IgniteCheckedException If failed.
*/
- @Override public void prepare() throws IgniteCheckedException {
+ public final void prepareRemoteTx() throws IgniteCheckedException {
// If another thread is doing prepare or rollback.
if (!state(PREPARING)) {
// In optimistic mode prepare may be called multiple times.
@@ -408,6 +401,15 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/**
+ * Prepare phase.
+ *
+ * @throws IgniteCheckedException If prepare failed.
+ */
+ @Override public void prepare() throws IgniteCheckedException {
+ prepareRemoteTx();
+ }
+
+ /**
* @throws IgniteCheckedException If commit failed.
*/
@SuppressWarnings({"CatchGenericClass"})
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 67e1993..c7967d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -161,7 +161,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/**
* @param node Node.
*/
- public void addLockTransactionNode(ClusterNode node) {
+ void addLockTransactionNode(ClusterNode node) {
assert node != null;
assert !node.isLocal();
@@ -185,7 +185,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
*
* @return Has near cache flag.
*/
- public boolean nearOnOriginatingNode() {
+ boolean nearOnOriginatingNode() {
return nearOnOriginatingNode;
}
@@ -206,7 +206,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/**
* @return Nodes where transactions were started on lock step.
*/
- @Nullable public Set<ClusterNode> lockTransactionNodes() {
+ @Nullable Set<ClusterNode> lockTransactionNodes() {
return lockTxNodes;
}
@@ -349,14 +349,14 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/**
* @param mappings Mappings to add.
*/
- void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
+ private void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
addMapping(mappings, dhtMap);
}
/**
* @param mappings Mappings to add.
*/
- void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
+ private void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
addMapping(mappings, nearMap);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index e1e0ec2..03bbfe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -207,13 +207,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (keyCheck)
validateCacheKey(key);
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
final CacheOperationContext opCtx = ctx.operationContextPerCall();
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx,
readyTopVer,
Collections.singleton(ctx.toCacheKeyObject(key)),
@@ -289,13 +289,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (keyCheck)
validateCacheKeys(keys);
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
final CacheOperationContext opCtx = ctx.operationContextPerCall();
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
- @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.getAllAsync(ctx,
readyTopVer,
ctx.cacheKeysView(keys),
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 940dd80..c39af34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -133,7 +133,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (keyCheck)
validateCacheKeys(keys);
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -141,7 +141,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
- @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.getAllAsync(ctx,
readyTopVer,
ctx.cacheKeysView(keys),