You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/09 00:36:26 UTC
ignite git commit: IGNITE-264 - Addressing review comments.
Repository: ignite
Updated Branches:
refs/heads/ignite-264 1b737d702 -> 29ce3f94d
IGNITE-264 - Addressing review comments.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29ce3f94
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29ce3f94
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29ce3f94
Branch: refs/heads/ignite-264
Commit: 29ce3f94db38c611cdf43eaaade75c2266f4777c
Parents: 1b737d7
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 8 15:36:18 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 8 15:36:18 2015 -0700
----------------------------------------------------------------------
.../dht/GridDhtTransactionalCacheAdapter.java | 504 +++++++++----------
.../distributed/dht/GridDhtTxPrepareFuture.java | 24 +-
.../cache/transactions/IgniteTxHandler.java | 64 ++-
3 files changed, 276 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/29ce3f94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index d2c9613..b9514a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -709,325 +709,311 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
@Nullable final CacheEntryPredicate[] filter0) {
final List<KeyCacheObject> keys = req.keys();
- IgniteInternalFuture<Object> keyFut = null;
+ CacheEntryPredicate[] filter = filter0;
- if (keyFut == null)
- keyFut = new GridFinishedFuture<>();
+ // Set message into thread context.
+ GridDhtTxLocal tx = null;
- return new GridEmbeddedFuture<>(keyFut,
- new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse>>() {
- @Override public IgniteInternalFuture<GridNearLockResponse> apply(Object o, Exception exx) {
- if (exx != null)
- return new GridDhtFinishedFuture<>(exx);
+ try {
+ int cnt = keys.size();
- CacheEntryPredicate[] filter = filter0;
+ if (req.inTx()) {
+ GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
- // Set message into thread context.
- GridDhtTxLocal tx = null;
+ if (dhtVer != null)
+ tx = ctx.tm().tx(dhtVer);
+ }
- try {
- int cnt = keys.size();
+ final List<GridCacheEntryEx> entries = new ArrayList<>(cnt);
- if (req.inTx()) {
- GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
+ // Unmarshal filter first.
+ if (filter == null)
+ filter = req.filter();
- if (dhtVer != null)
- tx = ctx.tm().tx(dhtVer);
- }
+ GridDhtLockFuture fut = null;
- final List<GridCacheEntryEx> entries = new ArrayList<>(cnt);
+ if (!req.inTx()) {
+ GridDhtPartitionTopology top = null;
- // Unmarshal filter first.
- if (filter == null)
- filter = req.filter();
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
- GridDhtLockFuture fut = null;
+ top = topology();
- if (!req.inTx()) {
- GridDhtPartitionTopology top = null;
+ topology().readLock();
+ }
- if (req.firstClientRequest()) {
- assert CU.clientNode(nearNode);
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
+ }
- top = topology();
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
- topology().readLock();
- }
+ return new GridFinishedFuture<>(res);
+ }
- try {
- if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
- if (log.isDebugEnabled()) {
- log.debug("Client topology version mismatch, need remap lock request [" +
- "reqTopVer=" + req.topologyVersion() +
- ", locTopVer=" + top.topologyVersion() +
- ", req=" + req + ']');
- }
+ fut = new GridDhtLockFuture(ctx,
+ nearNode.id(),
+ req.version(),
+ req.topologyVersion(),
+ cnt,
+ req.txRead(),
+ req.needReturnValue(),
+ req.timeout(),
+ tx,
+ req.threadId(),
+ req.accessTtl(),
+ filter,
+ req.skipStore());
+
+ // Add before mapping.
+ if (!ctx.mvcc().addFuture(fut))
+ throw new IllegalStateException("Duplicate future ID: " + fut);
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
+ }
- GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
- req,
- top.topologyVersion());
+ boolean timedout = false;
- return new GridFinishedFuture<>(res);
- }
+ for (KeyCacheObject key : keys) {
+ if (timedout)
+ break;
- fut = new GridDhtLockFuture(ctx,
- nearNode.id(),
- req.version(),
- req.topologyVersion(),
- cnt,
- req.txRead(),
- req.needReturnValue(),
- req.timeout(),
- tx,
- req.threadId(),
- req.accessTtl(),
- filter,
- req.skipStore());
+ while (true) {
+ // Specify topology version to make sure containment is checked
+ // based on the requested version, not the latest.
+ GridDhtCacheEntry entry = entryExx(key, req.topologyVersion());
- // Add before mapping.
- if (!ctx.mvcc().addFuture(fut))
- throw new IllegalStateException("Duplicate future ID: " + fut);
- }
- finally {
- if (top != null)
- top.readUnlock();
- }
- }
+ try {
+ if (fut != null) {
+ // This method will add local candidate.
+ // Entry cannot become obsolete after this method succeeded.
+ fut.addEntry(key == null ? null : entry);
- boolean timedout = false;
+ if (fut.isDone()) {
+ timedout = true;
- for (KeyCacheObject key : keys) {
- if (timedout)
break;
+ }
+ }
- while (true) {
- // Specify topology version to make sure containment is checked
- // based on the requested version, not the latest.
- GridDhtCacheEntry entry = entryExx(key, req.topologyVersion());
+ entries.add(entry);
- try {
- if (fut != null) {
- // This method will add local candidate.
- // Entry cannot become obsolete after this method succeeded.
- fut.addEntry(key == null ? null : entry);
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry when adding lock (will retry): " + entry);
+ }
+ catch (GridDistributedLockCancelledException e) {
+ if (log.isDebugEnabled())
+ log.debug("Got lock request for cancelled lock (will ignore): " +
+ entry);
- if (fut.isDone()) {
- timedout = true;
+ fut.onError(e);
- break;
- }
- }
+ return new GridDhtFinishedFuture<>(e);
+ }
+ }
+ }
- entries.add(entry);
+ // Handle implicit locks for pessimistic transactions.
+ if (req.inTx()) {
+ if (tx == null) {
+ GridDhtPartitionTopology top = null;
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry when adding lock (will retry): " + entry);
- }
- catch (GridDistributedLockCancelledException e) {
- if (log.isDebugEnabled())
- log.debug("Got lock request for cancelled lock (will ignore): " +
- entry);
+ if (req.firstClientRequest()) {
+ assert CU.clientNode(nearNode);
- fut.onError(e);
+ top = topology();
- return new GridDhtFinishedFuture<>(e);
- }
+ topology().readLock();
+ }
+
+ try {
+ if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Client topology version mismatch, need remap lock request [" +
+ "reqTopVer=" + req.topologyVersion() +
+ ", locTopVer=" + top.topologyVersion() +
+ ", req=" + req + ']');
}
- }
- // Handle implicit locks for pessimistic transactions.
- if (req.inTx()) {
- if (tx == null) {
- GridDhtPartitionTopology top = null;
+ GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+ req,
+ top.topologyVersion());
- if (req.firstClientRequest()) {
- assert CU.clientNode(nearNode);
+ return new GridFinishedFuture<>(res);
+ }
- top = topology();
+ tx = new GridDhtTxLocal(
+ ctx.shared(),
+ nearNode.id(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ req.implicitTx(),
+ req.implicitSingleTx(),
+ ctx.systemTx(),
+ false,
+ ctx.ioPolicy(),
+ PESSIMISTIC,
+ req.isolation(),
+ req.timeout(),
+ req.isInvalidate(),
+ !req.skipStore(),
+ false,
+ req.txSize(),
+ null,
+ req.subjectId(),
+ req.taskNameHash());
- topology().readLock();
- }
+ tx.syncCommit(req.syncCommit());
- try {
- if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) {
- if (log.isDebugEnabled()) {
- log.debug("Client topology version mismatch, need remap lock request [" +
- "reqTopVer=" + req.topologyVersion() +
- ", locTopVer=" + top.topologyVersion() +
- ", req=" + req + ']');
- }
+ tx = ctx.tm().onCreated(null, tx);
- GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
- req,
- top.topologyVersion());
+ if (tx == null || !tx.init()) {
+ String msg = "Failed to acquire lock (transaction has been completed): " +
+ req.version();
- return new GridFinishedFuture<>(res);
- }
+ U.warn(log, msg);
- tx = new GridDhtTxLocal(
- ctx.shared(),
- nearNode.id(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- req.implicitTx(),
- req.implicitSingleTx(),
- ctx.systemTx(),
- false,
- ctx.ioPolicy(),
- PESSIMISTIC,
- req.isolation(),
- req.timeout(),
- req.isInvalidate(),
- !req.skipStore(),
- false,
- req.txSize(),
- null,
- req.subjectId(),
- req.taskNameHash());
-
- tx.syncCommit(req.syncCommit());
-
- tx = ctx.tm().onCreated(null, tx);
-
- if (tx == null || !tx.init()) {
- String msg = "Failed to acquire lock (transaction has been completed): " +
- req.version();
-
- U.warn(log, msg);
-
- if (tx != null)
- tx.rollback();
-
- return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
- }
+ if (tx != null)
+ tx.rollback();
- tx.topologyVersion(req.topologyVersion());
- }
- finally {
- if (top != null)
- top.readUnlock();
- }
- }
+ return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+ }
- ctx.tm().txContext(tx);
+ tx.topologyVersion(req.topologyVersion());
+ }
+ finally {
+ if (top != null)
+ top.readUnlock();
+ }
+ }
- if (log.isDebugEnabled())
- log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']');
+ ctx.tm().txContext(tx);
- IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(
- cacheCtx,
+ if (log.isDebugEnabled())
+ log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']');
+
+ IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(
+ cacheCtx,
+ entries,
+ req.messageId(),
+ req.txRead(),
+ req.needReturnValue(),
+ req.accessTtl(),
+ req.skipStore());
+
+ final GridDhtTxLocal t = tx;
+
+ return new GridDhtEmbeddedFuture(
+ txFut,
+ new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() {
+ @Override public IgniteInternalFuture<GridNearLockResponse> apply(
+ GridCacheReturn o, Exception e) {
+ if (e != null)
+ e = U.unwrap(e);
+
+ assert !t.empty();
+
+ // Create response while holding locks.
+ final GridNearLockResponse resp = createLockReply(nearNode,
entries,
- req.messageId(),
- req.txRead(),
- req.needReturnValue(),
- req.accessTtl(),
- req.skipStore());
+ req,
+ t,
+ t.xidVersion(),
+ e);
+
+ if (resp.error() == null && t.onePhaseCommit()) {
+ assert t.implicit();
+
+ return t.commitAsync().chain(
+ new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() {
+ @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ try {
+ // Check for error.
+ f.get();
+ }
+ catch (IgniteCheckedException e1) {
+ resp.error(e1);
+ }
- final GridDhtTxLocal t = tx;
-
- return new GridDhtEmbeddedFuture(
- txFut,
- new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() {
- @Override public IgniteInternalFuture<GridNearLockResponse> apply(
- GridCacheReturn o, Exception e) {
- if (e != null)
- e = U.unwrap(e);
-
- assert !t.empty();
-
- // Create response while holding locks.
- final GridNearLockResponse resp = createLockReply(nearNode,
- entries,
- req,
- t,
- t.xidVersion(),
- e);
-
- if (resp.error() == null && t.onePhaseCommit()) {
- assert t.implicit();
-
- return t.commitAsync().chain(
- new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() {
- @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) {
- try {
- // Check for error.
- f.get();
- }
- catch (IgniteCheckedException e1) {
- resp.error(e1);
- }
-
- sendLockReply(nearNode, t, req, resp);
-
- return resp;
- }
- });
- }
- else {
sendLockReply(nearNode, t, req, resp);
- return new GridFinishedFuture<>(resp);
+ return resp;
}
- }
- }
- );
+ });
+ }
+ else {
+ sendLockReply(nearNode, t, req, resp);
+
+ return new GridFinishedFuture<>(resp);
+ }
}
- else {
- assert fut != null;
+ }
+ );
+ }
+ else {
+ assert fut != null;
- // This will send remote messages.
- fut.map();
+ // This will send remote messages.
+ fut.map();
- final GridCacheVersion mappedVer = fut.version();
+ final GridCacheVersion mappedVer = fut.version();
- return new GridDhtEmbeddedFuture<>(
- new C2<Boolean, Exception, GridNearLockResponse>() {
- @Override public GridNearLockResponse apply(Boolean b, Exception e) {
- if (e != null)
- e = U.unwrap(e);
- else if (!b)
- e = new GridCacheLockTimeoutException(req.version());
+ return new GridDhtEmbeddedFuture<>(
+ new C2<Boolean, Exception, GridNearLockResponse>() {
+ @Override public GridNearLockResponse apply(Boolean b, Exception e) {
+ if (e != null)
+ e = U.unwrap(e);
+ else if (!b)
+ e = new GridCacheLockTimeoutException(req.version());
- GridNearLockResponse res = createLockReply(nearNode,
- entries,
- req,
- null,
- mappedVer,
- e);
+ GridNearLockResponse res = createLockReply(nearNode,
+ entries,
+ req,
+ null,
+ mappedVer,
+ e);
- sendLockReply(nearNode, null, req, res);
+ sendLockReply(nearNode, null, req, res);
- return res;
- }
- },
- fut);
+ return res;
}
- }
- catch (IgniteCheckedException e) {
- String err = "Failed to unmarshal at least one of the keys for lock request message: " + req;
-
- U.error(log, err, e);
+ },
+ fut);
+ }
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ String err = "Failed to unmarshal at least one of the keys for lock request message: " + req;
- if (tx != null) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to rollback the transaction: " + tx, ex);
- }
- }
+ U.error(log, err, e);
- return new GridDhtFinishedFuture<>(
- new IgniteCheckedException(err, e));
- }
+ if (tx != null) {
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to rollback the transaction: " + tx, ex);
}
}
- );
+
+ return new GridDhtFinishedFuture<>(
+ new IgniteCheckedException(err, e));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/29ce3f94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 5b05698..89fc0ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -339,15 +339,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
- CacheObject val;
-
cached.unswap(retVal);
boolean readThrough = (retVal || hasFilters) &&
cacheCtx.config().isLoadPreviousValue() &&
!txEntry.skipStore();
- val = cached.innerGet(
+ CacheObject val = cached.innerGet(
tx,
/*swap*/true,
readThrough,
@@ -688,26 +686,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
/**
- * Checks if transaction involves a near-enabled cache on originating node.
- *
- * @return {@code True} if originating node has a near cache enabled and that cache participates in
- * the transaction.
- */
- private boolean originatingNodeHasNearCache() {
- ClusterNode node = cctx.discovery().node(tx.originatingNodeId());
-
- if (node == null)
- return false;
-
- for (int cacheId : tx.activeCacheIds()) {
- if (cctx.discovery().cacheNearNode(node, cctx.cacheContext(cacheId).name()))
- return true;
- }
-
- return false;
- }
-
- /**
* @param res Response being sent.
*/
private void addDhtValues(GridNearTxPrepareResponse res) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/29ce3f94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 5ef5629..9efa43a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -59,10 +59,8 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
@@ -213,42 +211,40 @@ public class IgniteTxHandler {
final GridNearTxLocal locTx,
final GridNearTxPrepareRequest req
) {
- IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
-
- return new GridEmbeddedFuture<>(
- fut,
- new C2<Object, Exception, IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public IgniteInternalFuture<GridNearTxPrepareResponse> apply(Object o, Exception ex) {
- if (ex != null)
- throw new GridClosureException(ex);
-
- IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
- req.reads(),
- req.writes(),
- req.transactionNodes(),
- req.last(),
- req.lastBackups());
-
- if (locTx.isRollbackOnly())
- locTx.rollbackAsync();
-
- return fut;
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
+ req.reads(),
+ req.writes(),
+ req.transactionNodes(),
+ req.last(),
+ req.lastBackups());
+
+ if (locTx.isRollbackOnly())
+ locTx.rollbackAsync();
+
+ return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() {
+ @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) {
+ try {
+ return f.get();
}
- },
- new C2<GridNearTxPrepareResponse, Exception, GridNearTxPrepareResponse>() {
- @Nullable @Override public GridNearTxPrepareResponse apply(GridNearTxPrepareResponse res, Exception e) {
- if (e != null) {
- locTx.setRollbackOnly(); // Just in case.
+ catch (Exception e) {
+ locTx.setRollbackOnly(); // Just in case.
- if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
- !X.hasCause(e, IgniteFutureCancelledException.class))
- U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
- }
+ if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
+ !X.hasCause(e, IgniteFutureCancelledException.class))
+ U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
- return res;
+ return new GridNearTxPrepareResponse(
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.version(),
+ null,
+ e,
+ null);
}
}
- );
+ });
}
/**