You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/16 15:25:57 UTC
[2/7] ignite git commit: Internal cache API cleanup.
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index f5687a0..307c348 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -17,21 +17,8 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.util.Collection;
-import java.util.Map;
-import javax.cache.Cache;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.jetbrains.annotations.Nullable;
/**
@@ -59,141 +46,11 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
public void userRollback() throws IgniteCheckedException;
/**
- * @param cacheCtx Cache context.
- * @param keys Keys to get.
- * @param deserializeBinary Deserialize binary flag.
- * @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects
- * @param skipStore Skip store flag.
- * @return Future for this get.
- */
- public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<KeyCacheObject> keys,
- boolean deserializeBinary,
- boolean skipVals,
- boolean keepCacheObjects,
- boolean skipStore,
- boolean needVer);
-
- /**
- * @param cacheCtx Cache context.
- * @param map Map to put.
- * @param retval Flag indicating whether a value should be returned.
- * @return Future for put operation.
- */
- public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Map<? extends K, ? extends V> map,
- boolean retval);
-
- /**
- * @param cacheCtx Cache context.
- * @param key Key.
- * @param val Value.
- * @param retval Return value flag.
- * @param filter Filter.
- * @return Future for put operation.
- */
- public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- K key,
- V val,
- boolean retval,
- CacheEntryPredicate filter);
-
- /**
- * @param cacheCtx Cache context.
- * @param key Key.
- * @param entryProcessor Entry processor.
- * @param invokeArgs Optional arguments for entry processor.
- * @return Operation future.
- */
- public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- K key,
- EntryProcessor<K, V, Object> entryProcessor,
- Object... invokeArgs);
-
- /**
- * @param cacheCtx Cache context.
- * @param map Entry processors map.
- * @param invokeArgs Optional arguments for entry processor.
- * @return Operation future.
- */
- public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
- Object... invokeArgs);
-
- /**
- * @param cacheCtx Cache context.
- * @param keys Keys to remove.
- * @param retval Flag indicating whether a value should be returned.
- * @param filter Filter.
- * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
- * @return Future for asynchronous remove.
- */
- public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<? extends K> keys,
- boolean retval,
- CacheEntryPredicate filter,
- boolean singleRmv);
-
- /**
- * @param cacheCtx Cache context.
- * @param drMap DR map to put.
- * @return Future for DR put operation.
- */
- public IgniteInternalFuture<?> putAllDrAsync(
- GridCacheContext cacheCtx,
- Map<KeyCacheObject, GridCacheDrInfo> drMap);
-
- /**
- * @param cacheCtx Cache context.
- * @param drMap DR map.
- * @return Future for asynchronous remove.
- */
- public IgniteInternalFuture<?> removeAllDrAsync(
- GridCacheContext cacheCtx,
- Map<KeyCacheObject, GridCacheVersion> drMap);
-
- /**
* Finishes transaction (either commit or rollback).
*
* @param commit {@code True} if commit, {@code false} if rollback.
* @return {@code True} if state has been changed.
* @throws IgniteCheckedException If finish failed.
*/
- public boolean finish(boolean commit) throws IgniteCheckedException;
-
- /**
- * @param cacheCtx Cache context.
- * @param readThrough Read through flag.
- * @param async if {@code True}, then loading will happen in a separate thread.
- * @param keys Keys.
- * @param skipVals Skip values flag.
- * @param needVer If {@code true} version is required for loaded values.
- * @param c Closure to be applied for loaded values.
- * @param expiryPlc Expiry policy.
- * @return Future with {@code True} value if loading took place.
- */
- public IgniteInternalFuture<Void> loadMissing(
- GridCacheContext cacheCtx,
- AffinityTopologyVersion topVer,
- boolean readThrough,
- boolean async,
- Collection<KeyCacheObject> keys,
- boolean skipVals,
- boolean needVer,
- boolean keepBinary,
- final ExpiryPolicy expiryPlc,
- GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
+ public boolean localFinish(boolean commit) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index df3bad2..d1334ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -56,7 +56,6 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVe
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
-import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
@@ -82,7 +81,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -127,7 +125,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** Slow tx warn timeout (initialized to 0). */
private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0);
- /** Tx salvage timeout (default 3s). */
+ /** Tx salvage timeout. */
private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
/** One phase commit deferred ack request timeout. */
@@ -138,9 +136,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
- /** Version in which deadlock detection introduced. */
- public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
-
/** Deadlock detection maximum iterations. */
static int DEADLOCK_MAX_ITERS =
IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
@@ -184,7 +179,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
PER_SEGMENT_Q);
/** Pending one phase commit ack requests sender. */
- private GridDeferredAckMessageSender deferredAckMessageSender;
+ private GridDeferredAckMessageSender deferredAckMsgSnd;
/** Transaction finish synchronizer. */
private GridCacheTxFinishSync txFinishSync;
@@ -206,54 +201,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
private TxDeadlockDetection txDeadlockDetection;
/** {@inheritDoc} */
- @Override protected void onKernalStart0(boolean reconnect) {
- if (reconnect)
- return;
-
- cctx.gridEvents().addLocalEventListener(
- new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent;
- assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- UUID nodeId = discoEvt.eventNode().id();
-
- cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
-
- if (txFinishSync != null)
- txFinishSync.onNodeLeft(nodeId);
-
- for (TxDeadlockFuture fut : deadlockDetectFuts.values())
- fut.onNodeLeft(nodeId);
-
- for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
- Object obj = entry.getValue();
-
- if (obj instanceof GridCacheReturnCompletableWrapper &&
- nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
- removeTxReturn(entry.getKey());
- }
- }
- },
- EVT_NODE_FAILED, EVT_NODE_LEFT);
-
- this.txDeadlockDetection = new TxDeadlockDetection(cctx);
-
- cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
-
- for (IgniteInternalTx tx : idMap.values()) {
- if ((!tx.local() || tx.dht()) && !cctx.discovery().aliveAll(tx.masterNodeIds())) {
- if (log.isDebugEnabled())
- log.debug("Remaining transaction from left node: " + tx);
-
- salvageTx(tx, true, USER_FINISH);
- }
- }
- }
-
- /** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridIO().removeMessageListener(TOPIC_TX);
}
@@ -264,7 +211,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
txHnd = new IgniteTxHandler(cctx);
- deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+ deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
@Override public int getTimeout() {
return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
}
@@ -293,6 +240,40 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
}
};
+
+ cctx.gridEvents().addLocalEventListener(
+ new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt instanceof DiscoveryEvent;
+ assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID nodeId = discoEvt.eventNode().id();
+
+ // Wait some time in case there are some unprocessed messages from failed node.
+ cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
+
+ if (txFinishSync != null)
+ txFinishSync.onNodeLeft(nodeId);
+
+ for (TxDeadlockFuture fut : deadlockDetectFuts.values())
+ fut.onNodeLeft(nodeId);
+
+ for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
+ Object obj = entry.getValue();
+
+ if (obj instanceof GridCacheReturnCompletableWrapper &&
+ nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
+ removeTxReturn(entry.getKey());
+ }
+ }
+ },
+ EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ this.txDeadlockDetection = new TxDeadlockDetection(cctx);
+
+ cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
}
/** {@inheritDoc} */
@@ -320,85 +301,35 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* Invalidates transaction.
*
* @param tx Transaction.
- * @return {@code True} if transaction was salvaged by this call.
*/
- public boolean salvageTx(IgniteInternalTx tx) {
- return salvageTx(tx, false, USER_FINISH);
+ public void salvageTx(IgniteInternalTx tx) {
+ salvageTx(tx, USER_FINISH);
}
/**
* Invalidates transaction.
*
* @param tx Transaction.
- * @param warn {@code True} if warning should be logged.
* @param status Finalization status.
- * @return {@code True} if transaction was salvaged by this call.
*/
- private boolean salvageTx(IgniteInternalTx tx, boolean warn, IgniteInternalTx.FinalizationStatus status) {
+ private void salvageTx(IgniteInternalTx tx, IgniteInternalTx.FinalizationStatus status) {
assert tx != null;
TransactionState state = tx.state();
- if (state == ACTIVE || state == PREPARING || state == PREPARED) {
- try {
- if (!tx.markFinalizing(status)) {
- if (log.isDebugEnabled())
- log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
-
- return false;
- }
-
- tx.systemInvalidate(true);
-
- tx.prepare();
-
- if (tx.state() == PREPARING) {
- if (log.isDebugEnabled())
- log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
- "by another thread: " + tx);
-
- return false;
- }
-
- if (tx instanceof IgniteTxRemoteEx) {
- IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
-
- rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList());
- }
-
- tx.commit();
-
- if (warn) {
- // This print out cannot print any peer-deployed entity either
- // directly or indirectly.
- U.warn(log, "Invalidated transaction because originating node either " +
- "crashed or left grid: " + CU.txString(tx));
- }
- }
- catch (IgniteCheckedException ignore) {
+ if (state == ACTIVE || state == PREPARING || state == PREPARED || state == MARKED_ROLLBACK) {
+ if (!tx.markFinalizing(status)) {
if (log.isDebugEnabled())
- log.debug("Optimistic failure while invalidating transaction (will rollback): " +
- tx.xidVersion());
+ log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e);
- }
- }
- }
- else if (state == MARKED_ROLLBACK) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e);
+ return;
}
- }
- return true;
+ tx.salvageTx();
+
+ if (log.isDebugEnabled())
+ log.debug("Invalidated transaction because originating node left grid: " + CU.txString(tx));
+ }
}
/**
@@ -442,7 +373,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return {@code True} if transaction has been committed or rolled back,
* {@code false} otherwise.
*/
- public boolean isCompleted(IgniteInternalTx tx) {
+ private boolean isCompleted(IgniteInternalTx tx) {
boolean completed = completedVersHashMap.containsKey(tx.xidVersion());
// Need check that for tx with timeout rollback message was not received before lock.
@@ -461,7 +392,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param txSize Expected transaction size.
* @return New transaction.
*/
- public IgniteTxLocalAdapter newTx(
+ public GridNearTxLocal newTx(
boolean implicit,
boolean implicitSingle,
@Nullable GridCacheContext sysCacheCtx,
@@ -672,13 +603,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cctx Cache context.
* @return Transaction for current thread.
*/
- @SuppressWarnings({"unchecked"})
- public <T> T threadLocalTx(GridCacheContext cctx) {
+ public GridNearTxLocal threadLocalTx(GridCacheContext cctx) {
IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId());
- return tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit() ? (T)tx : null;
+ if (tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit()) {
+ assert tx instanceof GridNearTxLocal : tx;
+
+ return (GridNearTxLocal)tx;
+ }
+
+ return null;
}
/**
@@ -747,48 +684,53 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Local transaction.
*/
@Nullable public IgniteInternalTx localTxx() {
- IgniteInternalTx tx = txx();
+ IgniteInternalTx tx = tx();
return tx != null && tx.local() ? tx : null;
}
/**
- * @return Transaction for current thread.
- */
- @SuppressWarnings({"unchecked"})
- public IgniteInternalTx txx() {
- return tx();
- }
-
- /**
* @return User transaction for current thread.
*/
- @Nullable public IgniteInternalTx userTx() {
+ @Nullable public GridNearTxLocal userTx() {
IgniteInternalTx tx = txContext();
- if (tx != null && tx.user() && tx.state() == ACTIVE)
- return tx;
+ if (activeUserTx(tx))
+ return (GridNearTxLocal)tx;
tx = tx(null, Thread.currentThread().getId());
- return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
+ if (activeUserTx(tx))
+ return (GridNearTxLocal)tx;
+
+ return null;
}
/**
+ * @param cctx Cache context.
* @return User transaction for current thread.
*/
- @Nullable public IgniteInternalTx userTx(GridCacheContext cctx) {
+ @Nullable GridNearTxLocal userTx(GridCacheContext cctx) {
IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId());
- return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
+ if (activeUserTx(tx))
+ return (GridNearTxLocal)tx;
+
+ return null;
}
/**
- * @return User transaction.
+ * @param tx Transaction.
+ * @return {@code True} if given transaction is explicitly started user transaction.
*/
- @SuppressWarnings({"unchecked"})
- @Nullable public <T extends IgniteTxLocalEx> T userTxx() {
- return (T)userTx();
+ private boolean activeUserTx(@Nullable IgniteInternalTx tx) {
+ if (tx != null && tx.user() && tx.state() == ACTIVE) {
+ assert tx instanceof GridNearTxLocal : tx;
+
+ return true;
+ }
+
+ return false;
}
/**
@@ -1241,7 +1183,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
unlockMultiple(tx, tx.readEntries());
// 6. Notify evictions.
- notifyEvitions(tx);
+ notifyEvictions(tx);
// 7. Remove obsolete entries from cache.
removeObsolete(tx);
@@ -1314,7 +1256,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
unlockMultiple(tx, tx.readEntries());
// 4. Notify evictions.
- notifyEvitions(tx);
+ notifyEvictions(tx);
// 5. Remove obsolete entries.
removeObsolete(tx);
@@ -1364,7 +1306,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (txIdMap.remove(tx.xidVersion(), tx)) {
// 1. Notify evictions.
- notifyEvitions(tx);
+ notifyEvictions(tx);
// 2. Evict near entries.
if (!tx.readMap().isEmpty()) {
@@ -1400,7 +1342,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
*
* @param tx Tx to uncommit.
*/
- public void uncommitTx(IgniteInternalTx tx) {
+ void uncommitTx(IgniteInternalTx tx) {
assert tx != null;
if (log.isDebugEnabled())
@@ -1417,15 +1359,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
unlockMultiple(tx, tx.readEntries());
// 3. Notify evictions.
- notifyEvitions(tx);
+ notifyEvictions(tx);
// 4. Remove from per-thread storage.
clearThreadMap(tx);
// 5. Unregister explicit locks.
- if (!tx.alternateVersions().isEmpty())
+ if (!tx.alternateVersions().isEmpty()) {
for (GridCacheVersion ver : tx.alternateVersions())
idMap.remove(ver);
+ }
// 6. Remove Near-2-DHT mappings.
if (tx instanceof GridCacheMappedVersion)
@@ -1481,7 +1424,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/**
* @param tx Transaction to notify evictions for.
*/
- private void notifyEvitions(IgniteInternalTx tx) {
+ private void notifyEvictions(IgniteInternalTx tx) {
if (tx.internal())
return;
@@ -1981,10 +1924,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return;
}
- if (tx instanceof GridDistributedTxRemoteAdapter) {
+ if (tx instanceof IgniteTxRemoteEx) {
IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
- rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(),
+ rmtTx.doneRemote(tx.xidVersion(),
+ Collections.<GridCacheVersion>emptyList(),
+ Collections.<GridCacheVersion>emptyList(),
Collections.<GridCacheVersion>emptyList());
}
@@ -2058,43 +2003,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
return;
}
- if (supportsDeadlockDetection(node)) {
- TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
+ TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
- try {
- if (!cctx.localNodeId().equals(nodeId))
- req.prepareMarshal(cctx);
-
- cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
- }
- catch (IgniteCheckedException e) {
- if (e instanceof ClusterTopologyCheckedException) {
- if (log.isDebugEnabled())
- log.debug("Failed to finish deadlock detection, node left: " + nodeId);
- }
- else
- U.warn(log, "Failed to finish deadlock detection: " + e, e);
+ try {
+ if (!cctx.localNodeId().equals(nodeId))
+ req.prepareMarshal(cctx);
- fut.onDone();
- }
+ cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
}
- else {
- if (log.isDebugEnabled())
- log.debug("Failed to finish deadlock detection, node does not support deadlock detection: " + node);
+ catch (IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to finish deadlock detection, node left: " + nodeId);
+ }
+ else
+ U.warn(log, "Failed to finish deadlock detection: " + e, e);
fut.onDone();
}
}
/**
- * @param node Node.
- * @return {@code True} if node supports deadlock detection protocol.
- */
- private boolean supportsDeadlockDetection(ClusterNode node) {
- return TX_DEADLOCK_DETECTION_SINCE.compareToIgnoreTimestamp(node.version()) <= 0;
- }
-
- /**
* @param tx Tx.
* @param txKeys Tx keys.
* @return {@code True} if key is involved into tx.
@@ -2265,7 +2194,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param ver Version to ack.
*/
public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
- deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver);
+ deferredAckMsgSnd.sendDeferredAckMessage(nodeId, ver);
}
/**
@@ -2314,9 +2243,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
", failedNodeId=" + evtNodeId + ']');
for (final IgniteInternalTx tx : txs()) {
- if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) {
+ if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) {
// Invalidate transactions.
- salvageTx(tx, false, RECOVERY_FINISH);
+ salvageTx(tx, RECOVERY_FINISH);
}
else {
// Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 8ceca3f..87cc7cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
@@ -25,6 +26,16 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
*/
public interface IgniteTxRemoteEx extends IgniteInternalTx {
/**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void commitRemoteTx() throws IgniteCheckedException;
+
+ /**
+ *
+ */
+ public void rollbackRemoteTx();
+
+ /**
* @param baseVer Base version.
* @param committedVers Committed version.
* @param rolledbackVers Rolled back version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 1c2ccbe..3c27bad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -88,7 +88,7 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
}
/** {@inheritDoc} */
- @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+ @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index c121b1b..822e44e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -104,7 +104,7 @@ public interface IgniteTxState {
* @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with
* store enabled.
*/
- public boolean storeUsed(GridCacheSharedContext cctx);
+ public boolean storeWriteThrough(GridCacheSharedContext cctx);
/**
* @param cctx Context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 76751de..399eea3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -289,14 +289,14 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
- @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+ @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
if (!activeCacheIds.isEmpty()) {
for (int i = 0; i < activeCacheIds.size(); i++) {
int cacheId = (int)activeCacheIds.get(i);
CacheStoreManager store = cctx.cacheContext(cacheId).store();
- if (store.configured())
+ if (store.configured() && store.isWriteThrough())
return true;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 6134b9f..8ffec00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -51,7 +52,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
/** Wrapped transaction. */
@GridToStringInclude
- private IgniteInternalTx tx;
+ private GridNearTxLocal tx;
/** Gateway. */
@GridToStringExclude
@@ -75,7 +76,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
* @param cctx Shared context.
* @param async Async flag.
*/
- public TransactionProxyImpl(IgniteInternalTx tx, GridCacheSharedContext<K, V> cctx, boolean async) {
+ public TransactionProxyImpl(GridNearTxLocal tx, GridCacheSharedContext<K, V> cctx, boolean async) {
assert tx != null;
assert cctx != null;
@@ -87,7 +88,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
/**
* @return Transaction.
*/
- public IgniteInternalTx tx() {
+ public GridNearTxLocal tx() {
return tx;
}
@@ -316,7 +317,9 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
IgniteInternalFuture<Transaction> fut0 = fut.chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() {
@Override public Transaction applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException {
- return fut.get().proxy();
+ fut.get();
+
+ return TransactionProxyImpl.this;
}
});
@@ -330,7 +333,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- tx = (IgniteInternalTx)in.readObject();
+ tx = (GridNearTxLocal)in.readObject();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 96644a3..0420182 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -67,7 +67,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -95,8 +95,8 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_STAMPED;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
-import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.REENTRANT_LOCK;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -342,7 +342,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class);
// Check that sequence hasn't been created in other thread yet.
@@ -471,7 +471,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class);
// Check that atomic long hasn't been created in other thread yet.
@@ -551,7 +551,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (!create)
return c.applyx();
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
IgniteCheckedException err =
utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
@@ -623,7 +623,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
retryTopologySafe(new IgniteOutClosureX<Void>() {
@Override public Void applyx() throws IgniteCheckedException {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
T2<Boolean, IgniteCheckedException> res =
utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
@@ -682,7 +682,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue val = cast(dsView.get(key),
GridCacheAtomicReferenceValue.class);
@@ -786,7 +786,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicStampedValue val = cast(dsView.get(key),
GridCacheAtomicStampedValue.class);
@@ -1033,7 +1033,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return retryTopologySafe(new IgniteOutClosureX<T>() {
@Override public T applyx() throws IgniteCheckedException {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
T2<String, IgniteCheckedException> res =
utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
@@ -1133,7 +1133,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class);
// Check that count down hasn't been created in other thread yet.
@@ -1198,7 +1198,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
GridCacheCountDownLatchValue val =
cast(dsView.get(key), GridCacheCountDownLatchValue.class);
@@ -1254,7 +1254,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
// Check that semaphore hasn't been created in other thread yet.
@@ -1319,7 +1319,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
@@ -1371,7 +1371,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class);
// Check that reentrant lock hasn't been created in other thread yet.
@@ -1438,7 +1438,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class);
@@ -1474,7 +1474,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
return CU.outTx(
new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
// Check correctness type of removable object.
R val = cast(dsView.get(key), cls);
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index dfd2122..640b72d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -90,7 +90,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Callable for {@link #incrementAndGet()}. */
private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -117,7 +117,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Callable for {@link #getAndIncrement()}. */
private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -144,7 +144,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Callable for {@link #decrementAndGet()}. */
private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -171,7 +171,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Callable for {@link #getAndDecrement()}. */
private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -430,7 +430,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
private Callable<Long> internalAddAndGet(final long l) {
return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -464,7 +464,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
private Callable<Long> internalGetAndAdd(final long l) {
return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -498,7 +498,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
private Callable<Long> internalGetAndSet(final long l) {
return new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
@@ -534,7 +534,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) {
return new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
if (val == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 448dd8b..6911b3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -213,7 +213,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
private Callable<Boolean> internalSet(final T val) {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
if (ref == null)
@@ -247,7 +247,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) {
return retryTopologySafe(new Callable<T>() {
@Override public T call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
if (ref == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 754d8f5..87aae8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -486,7 +486,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
private Callable<Long> internalUpdate(final long l, final boolean updated) {
return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seq = seqView.get(key);
checkRemoved();
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 6ac303c..14f80e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -267,7 +267,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
private Callable<Boolean> internalSet(final T val, final S stamp) {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
if (stmp == null)
@@ -305,7 +305,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
final IgniteClosure<S, S> newStampClos) {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
if (stmp == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 723fb55..45c3677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -32,7 +32,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -282,7 +282,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
internalLatch = CU.outTx(
retryTopologySafe(new Callable<CountDownLatch>() {
@Override public CountDownLatch call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue val = latchView.get(key);
if (val == null) {
@@ -407,7 +407,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public Integer call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue latchVal = latchView.get(key);
if (latchVal == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index 1cf78fa..5f0cb44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -49,7 +49,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -520,8 +520,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
-
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
if (val == null)
@@ -614,7 +613,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
if (val == null)
@@ -711,7 +710,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
if (val == null)
@@ -1089,7 +1088,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
sync = CU.outTx(
retryTopologySafe(new Callable<Sync>() {
@Override public Sync call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
if (val == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index a11c79d..a1c0515 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -40,7 +40,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -285,7 +285,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return CU.outTx(
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx,
semView,
PESSIMISTIC, REPEATABLE_READ)
) {
@@ -359,7 +359,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (
- IgniteInternalTx tx = CU.txStartInternal(ctx,
+ GridNearTxLocal tx = CU.txStartInternal(ctx,
semView,
PESSIMISTIC, REPEATABLE_READ)
) {
@@ -454,7 +454,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
sync = CU.outTx(
retryTopologySafe(new Callable<Sync>() {
@Override public Sync call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx,
semView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semView.get(key);
@@ -465,7 +465,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
return null;
}
- final int count = val.getCount();
+ final int cnt = val.getCount();
Map<UUID, Integer> waiters = val.getWaiters();
@@ -473,7 +473,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
tx.commit();
- return new Sync(count, waiters, failoverSafe);
+ return new Sync(cnt, waiters, failoverSafe);
}
}
}),
@@ -676,7 +676,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
retryTopologySafe(new Callable<Integer>() {
@Override public Integer call() throws Exception {
try (
- IgniteInternalTx tx = CU.txStartInternal(ctx,
+ GridNearTxLocal tx = CU.txStartInternal(ctx,
semView, PESSIMISTIC, REPEATABLE_READ)
) {
GridCacheSemaphoreState val = semView.get(key);
@@ -684,11 +684,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
if (val == null)
throw new IgniteException("Failed to find semaphore with given name: " + name);
- int count = val.getCount();
+ int cnt = val.getCount();
tx.rollback();
- return count;
+ return cnt;
}
}
}),
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 7b80765..846eb69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -25,7 +25,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -57,7 +57,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
@Override public Boolean call() throws Exception {
boolean retVal;
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get();
if (idx != null) {
@@ -97,7 +97,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
T retVal;
while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
if (idx != null) {
@@ -143,7 +143,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
@Override public Boolean call() throws Exception {
boolean retVal;
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
if (idx != null) {
@@ -188,7 +188,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
try {
retryTopologySafe(new Callable<Object>() {
@Override public Object call() throws Exception {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
if (idx != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 74fc175..acd0a1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -17,7 +17,31 @@
package org.apache.ignite.internal.processors.igfs;
+import java.io.DataInput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
@@ -41,7 +65,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters;
import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -58,31 +82,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
-import java.io.DataInput;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
@@ -654,7 +653,7 @@ public class IgfsDataManager extends IgfsManager {
// Need to check if block is partially written.
// If so, must update it in pessimistic transaction.
if (block.length != fileInfo.blockSize()) {
- try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key));
byte[] val = vals.get(colocatedKey);
@@ -1062,7 +1061,7 @@ public class IgfsDataManager extends IgfsManager {
IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null,
colocatedKey.evictExclude(), colocatedKey.blockId());
- try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ try (GridNearTxLocal tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
// Lock keys.
Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key));