You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/16 09:07:40 UTC
[5/8] incubator-ignite git commit: Revert "ignite-471: fixed NPE in
PortableMarshaller"
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
deleted file mode 100644
index 4f74303..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ /dev/null
@@ -1,768 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.transactions.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
-import static org.apache.ignite.transactions.TransactionState.*;
-
-/**
- *
- */
-public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter
- implements GridCacheMvccFuture<IgniteInternalTx> {
- /** */
- private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
-
- /**
- * @param cctx Context.
- * @param tx Transaction.
- */
- public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
- super(cctx, tx);
-
- assert tx.optimistic() : tx;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
- if (log.isDebugEnabled())
- log.debug("Transaction future received owner changed callback: " + entry);
-
- if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) {
- lockKeys.remove(entry.txKey());
-
- // This will check for locks.
- onDone();
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- boolean found = false;
-
- for (IgniteInternalFuture<?> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture) fut;
-
- if (f.node().id().equals(nodeId)) {
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
-
- found = true;
- }
- }
- }
-
- return found;
- }
-
- /**
- * @param nodeId Failed node ID.
- * @param mappings Remaining mappings.
- * @param e Error.
- */
- void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
- if (err.compareAndSet(null, e)) {
- boolean marked = tx.setRollbackOnly();
-
- if (e instanceof IgniteTxOptimisticCheckedException) {
- assert nodeId != null : "Missing node ID for optimistic failure exception: " + e;
-
- tx.removeKeysMapping(nodeId, mappings);
- }
-
- if (e instanceof IgniteTxRollbackCheckedException) {
- if (marked) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
- }
- }
- }
-
- onComplete();
- }
- }
-
- /**
- * @return {@code True} if all locks are owned.
- */
- private boolean checkLocks() {
- boolean locked = lockKeys.isEmpty();
-
- if (locked) {
- if (log.isDebugEnabled())
- log.debug("All locks are acquired for near prepare future: " + this);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
- }
-
- return locked;
- }
-
- /** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
- if (!isDone()) {
- for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
-
- f.onResult(nodeId, res);
- }
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
- // If locks were not acquired yet, delay completion.
- if (isDone() || (err == null && !checkLocks()))
- return false;
-
- this.err.compareAndSet(null, err);
-
- if (err == null)
- tx.state(PREPARED);
-
- if (super.onDone(tx, err)) {
- // Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @param f Future.
- * @return {@code True} if mini-future.
- */
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
- }
-
- /**
- * Completeness callback.
- */
- private void onComplete() {
- if (super.onDone(tx, err.get()))
- // Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
- }
-
- /** {@inheritDoc} */
- @Override public void prepare() {
- // Obtain the topology version to use.
- AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
-
- if (topVer != null) {
- tx.topologyVersion(topVer);
-
- prepare0();
-
- return;
- }
-
- prepareOnTopology();
- }
-
- /**
- *
- */
- private void prepareOnTopology() {
- GridDhtTopologyFuture topFut = topologyReadLock();
-
- try {
- if (topFut == null) {
- assert isDone();
-
- return;
- }
-
- if (topFut.isDone()) {
- StringBuilder invalidCaches = new StringBuilder();
-
- boolean cacheInvalid = false;
-
- for (GridCacheContext ctx : cctx.cacheContexts()) {
- if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) {
- if (cacheInvalid)
- invalidCaches.append(", ");
-
- invalidCaches.append(U.maskName(ctx.name()));
-
- cacheInvalid = true;
- }
- }
-
- if (cacheInvalid) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- invalidCaches.toString()));
-
- return;
- }
-
- tx.topologyVersion(topFut.topologyVersion());
-
- prepare0();
- }
- else {
- topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
- @Override public void run() {
- prepareOnTopology();
- }
- });
- }
- });
- }
- }
- finally {
- topologyReadUnlock();
- }
- }
-
- /**
- * Acquires topology read lock.
- *
- * @return Topology ready future.
- */
- private GridDhtTopologyFuture topologyReadLock() {
- if (tx.activeCacheIds().isEmpty())
- return cctx.exchange().lastTopologyFuture();
-
- GridCacheContext<?, ?> nonLocCtx = null;
-
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- if (!cacheCtx.isLocal()) {
- nonLocCtx = cacheCtx;
-
- break;
- }
- }
-
- if (nonLocCtx == null)
- return cctx.exchange().lastTopologyFuture();
-
- nonLocCtx.topology().readLock();
-
- if (nonLocCtx.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- nonLocCtx.name()));
-
- return null;
- }
-
- return nonLocCtx.topology().topologyVersionFuture();
- }
-
- /**
- * Releases topology read lock.
- */
- private void topologyReadUnlock() {
- if (!tx.activeCacheIds().isEmpty()) {
- GridCacheContext<?, ?> nonLocCtx = null;
-
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- if (!cacheCtx.isLocal()) {
- nonLocCtx = cacheCtx;
-
- break;
- }
- }
-
- if (nonLocCtx != null)
- nonLocCtx.topology().readUnlock();
- }
- }
-
- /**
- * Initializes future.
- */
- private void prepare0() {
- try {
- if (!tx.state(PREPARING)) {
- if (tx.setRollbackOnly()) {
- if (tx.timedOut())
- onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
- "was rolled back: " + this));
- else
- onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'));
- }
- else
- onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
- "prepare [state=" + tx.state() + ", tx=" + this + ']'));
-
- return;
- }
-
- // Make sure to add future before calling prepare.
- cctx.mvcc().addFuture(this);
-
- prepare(
- tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
- tx.writeEntries());
-
- markInitialized();
- }
- catch (TransactionTimeoutException | TransactionOptimisticException e) {
- onError(cctx.localNodeId(), null, e);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
-
- /**
- * @param reads Read entries.
- * @param writes Write entries.
- * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node.
- */
- private void prepare(
- Iterable<IgniteTxEntry> reads,
- Iterable<IgniteTxEntry> writes
- ) throws IgniteCheckedException {
- AffinityTopologyVersion topVer = tx.topologyVersion();
-
- assert topVer.topologyVersion() > 0;
-
- txMapping = new GridDhtTxMapping();
-
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
-
- if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
- onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " +
- "partition nodes left the grid): " + cacheCtx.name()));
-
- return;
- }
- }
- }
-
- // Assign keys to primary nodes.
- GridDistributedTxMapping cur = null;
-
- for (IgniteTxEntry read : reads) {
- GridDistributedTxMapping updated = map(read, topVer, cur, false);
-
- if (cur != updated) {
- mappings.offer(updated);
-
- if (updated.node().isLocal()) {
- if (read.context().isNear())
- tx.nearLocallyMapped(true);
- else if (read.context().isColocated())
- tx.colocatedLocallyMapped(true);
- }
-
- cur = updated;
- }
- }
-
- for (IgniteTxEntry write : writes) {
- GridDistributedTxMapping updated = map(write, topVer, cur, true);
-
- if (cur != updated) {
- mappings.offer(updated);
-
- if (updated.node().isLocal()) {
- if (write.context().isNear())
- tx.nearLocallyMapped(true);
- else if (write.context().isColocated())
- tx.colocatedLocallyMapped(true);
- }
-
- cur = updated;
- }
- }
-
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Abandoning (re)map because future is done: " + this);
-
- return;
- }
-
- tx.addEntryMapping(mappings);
-
- cctx.mvcc().recheckPendingLocks();
-
- txMapping.initLast(mappings);
-
- tx.transactionNodes(txMapping.transactionNodes());
-
- checkOnePhase();
-
- proceedPrepare(mappings);
- }
-
- /**
- * Continues prepare after previous mapping successfully finished.
- *
- * @param mappings Queue of mappings.
- */
- private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
- if (isDone())
- return;
-
- final GridDistributedTxMapping m = mappings.poll();
-
- if (m == null)
- return;
-
- assert !m.empty();
-
- final ClusterNode n = m.node();
-
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- tx.optimistic() && tx.serializable() ? m.reads() : null,
- m.writes(),
- m.near(),
- txMapping.transactionNodes(),
- m.last(),
- m.lastBackups(),
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash());
-
- for (IgniteTxEntry txEntry : m.writes()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
- }
-
- // Must lock near entries separately.
- if (m.near()) {
- try {
- tx.optimisticLockEntries(req.writes());
-
- tx.userPrepare();
- }
- catch (IgniteCheckedException e) {
- onError(null, null, e);
- }
- }
-
- final MiniFuture fut = new MiniFuture(m, mappings);
-
- req.miniId(fut.futureId());
-
- add(fut); // Append new future.
-
- // If this is the primary node for the keys.
- if (n.isLocal()) {
- // At this point, if any new node joined, then it is
- // waiting for this transaction to complete, so
- // partition reassignments are not possible here.
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
-
- prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
- try {
- fut.onResult(n.id(), prepFut.get());
- }
- catch (IgniteCheckedException e) {
- fut.onResult(e);
- }
- }
- });
- }
- else {
- try {
- cctx.io().send(n, req, tx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- // Fail the whole thing.
- fut.onResult(e);
- }
- }
- }
-
- /**
- * @param entry Transaction entry.
- * @param topVer Topology version.
- * @param cur Current mapping.
- * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
- * @return Mapping.
- */
- private GridDistributedTxMapping map(
- IgniteTxEntry entry,
- AffinityTopologyVersion topVer,
- GridDistributedTxMapping cur,
- boolean waitLock
- ) throws IgniteCheckedException {
- GridCacheContext cacheCtx = entry.context();
-
- List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
-
- txMapping.addMapping(nodes);
-
- ClusterNode primary = F.first(nodes);
-
- assert primary != null;
-
- if (log.isDebugEnabled()) {
- log.debug("Mapped key to primary node [key=" + entry.key() +
- ", part=" + cacheCtx.affinity().partition(entry.key()) +
- ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
- }
-
- // Must re-initialize cached entry while holding topology lock.
- if (cacheCtx.isNear())
- entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
- else if (!cacheCtx.isLocal())
- entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
- else
- entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
-
- if (cacheCtx.isNear() || cacheCtx.isLocal()) {
- if (waitLock && entry.explicitVersion() == null)
- lockKeys.add(entry.txKey());
- }
-
- if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
- cur = new GridDistributedTxMapping(primary);
-
- // Initialize near flag right away.
- cur.near(cacheCtx.isNear());
- }
-
- cur.add(entry);
-
- if (entry.explicitVersion() != null) {
- tx.markExplicit(primary.id());
-
- cur.markExplicitLock();
- }
-
- entry.nodeId(primary.id());
-
- if (cacheCtx.isNear()) {
- while (true) {
- try {
- GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
-
- cached.dhtNodeId(tx.xidVersion(), primary.id());
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- entry.cached(cacheCtx.near().entryEx(entry.key()));
- }
- }
- }
-
- return cur;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
- @Override public String apply(IgniteInternalFuture<?> f) {
- return "[node=" + ((MiniFuture)f).node().id() +
- ", loc=" + ((MiniFuture)f).node().isLocal() +
- ", done=" + f.isDone() + "]";
- }
- });
-
- return S.toString(GridNearOptimisticTxPrepareFuture.class, this,
- "futs", futs,
- "super", super.toString());
- }
-
- /**
- *
- */
- private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Keys. */
- @GridToStringInclude
- private GridDistributedTxMapping m;
-
- /** Flag to signal some result being processed. */
- private AtomicBoolean rcvRes = new AtomicBoolean(false);
-
- /** Mappings to proceed prepare. */
- private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
-
- /**
- * @param m Mapping.
- * @param mappings Queue of mappings to proceed with.
- */
- MiniFuture(
- GridDistributedTxMapping m,
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
- ) {
- this.m = m;
- this.mappings = mappings;
- }
-
- /**
- * @return Future ID.
- */
- IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Node ID.
- */
- public ClusterNode node() {
- return m.node();
- }
-
- /**
- * @return Keys.
- */
- public GridDistributedTxMapping mapping() {
- return m;
- }
-
- /**
- * @param e Error.
- */
- void onResult(Throwable e) {
- if (rcvRes.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- // Fail.
- onDone(e);
- }
- else
- U.warn(log, "Received error after another result has been processed [fut=" +
- GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e);
- }
-
- /**
- * @param e Node failure.
- */
- void onResult(ClusterTopologyCheckedException e) {
- if (isDone())
- return;
-
- if (rcvRes.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
-
- // Fail the whole future (make sure not to remap on different primary node
- // to prevent multiple lock coordinators).
- onError(null, null, e);
- }
- }
-
- /**
- * @param nodeId Failed node ID.
- * @param res Result callback.
- */
- void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
- if (isDone())
- return;
-
- if (rcvRes.compareAndSet(false, true)) {
- if (res.error() != null) {
- // Fail the whole compound future.
- onError(nodeId, mappings, res.error());
- }
- else {
- onPrepareResponse(m, res);
-
- // Proceed prepare before finishing mini future.
- if (mappings != null)
- proceedPrepare(mappings);
-
- // Finish this mini future.
- onDone(tx);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
deleted file mode 100644
index bce62c1..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.transactions.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
-import static org.apache.ignite.transactions.TransactionState.*;
-
-/**
- *
- */
-public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter {
- /**
- * @param cctx Context.
- * @param tx Transaction.
- */
- public GridNearPessimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
- super(cctx, tx);
-
- assert tx.pessimistic() : tx;
-
- // Should wait for all mini futures completion before finishing tx.
- ignoreChildFailures(IgniteCheckedException.class);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- return ((MiniFuture)f).node();
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- boolean found = false;
-
- for (IgniteInternalFuture<?> fut : futures()) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.node().id().equals(nodeId)) {
- f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
-
- found = true;
- }
- }
-
- return found;
- }
-
- /** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
- if (!isDone()) {
- for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
-
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f);
-
- f.onResult(res);
- }
- }
- }
- }
- /** {@inheritDoc} */
- @Override public void prepare() {
- if (!tx.state(PREPARING)) {
- if (tx.setRollbackOnly()) {
- if (tx.timedOut())
- onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx));
- else
- onDone(new IgniteCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'));
- }
- else
- onDone(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'));
-
- return;
- }
-
- try {
- tx.userPrepare();
-
- cctx.mvcc().addFuture(this);
-
- preparePessimistic();
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
-
- /**
- *
- */
- private void preparePessimistic() {
- Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
-
- AffinityTopologyVersion topVer = tx.topologyVersion();
-
- txMapping = new GridDhtTxMapping();
-
- for (IgniteTxEntry txEntry : tx.allEntries()) {
- GridCacheContext cacheCtx = txEntry.context();
-
- List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
-
- ClusterNode primary = F.first(nodes);
-
- boolean near = cacheCtx.isNear();
-
- IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);
-
- GridDistributedTxMapping nodeMapping = mappings.get(key);
-
- if (nodeMapping == null) {
- nodeMapping = new GridDistributedTxMapping(primary);
-
- nodeMapping.near(cacheCtx.isNear());
-
- mappings.put(key, nodeMapping);
- }
-
- txEntry.nodeId(primary.id());
-
- nodeMapping.add(txEntry);
-
- txMapping.addMapping(nodes);
- }
-
- tx.transactionNodes(txMapping.transactionNodes());
-
- checkOnePhase();
-
- for (final GridDistributedTxMapping m : mappings.values()) {
- final ClusterNode node = m.node();
-
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- m.reads(),
- m.writes(),
- m.near(),
- txMapping.transactionNodes(),
- true,
- txMapping.transactionNodes().get(node.id()),
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash());
-
- for (IgniteTxEntry txEntry : m.writes()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
- }
-
- final MiniFuture fut = new MiniFuture(m);
-
- req.miniId(fut.futureId());
-
- add(fut);
-
- if (node.isLocal()) {
- IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(node.id(),
- tx,
- req);
-
- prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
- @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
- try {
- fut.onResult(prepFut.get());
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
- }
- });
- }
- else {
- try {
- cctx.io().send(node, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException e) {
- fut.onNodeLeft(e);
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
- }
- }
-
- markInitialized();
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) {
- if (err != null)
- this.err.compareAndSet(null, err);
-
- err = this.err.get();
-
- if (err == null)
- tx.state(PREPARED);
-
- if (super.onDone(tx, err)) {
- cctx.mvcc().removeFuture(this);
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
- @Override public String apply(IgniteInternalFuture<?> f) {
- return "[node=" + ((MiniFuture)f).node().id() +
- ", loc=" + ((MiniFuture)f).node().isLocal() +
- ", done=" + f.isDone() + "]";
- }
- });
-
- return S.toString(GridNearPessimisticTxPrepareFuture.class, this,
- "futs", futs,
- "super", super.toString());
- }
-
- /**
- *
- */
- private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** */
- private GridDistributedTxMapping m;
-
- /**
- * @param m Mapping.
- */
- MiniFuture(GridDistributedTxMapping m) {
- this.m = m;
- }
-
- /**
- * @return Future ID.
- */
- IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Node ID.
- */
- public ClusterNode node() {
- return m.node();
- }
-
- /**
- * @param res Response.
- */
- void onResult(GridNearTxPrepareResponse res) {
- if (res.error() != null)
- onError(res.error());
- else {
- onPrepareResponse(m, res);
-
- onDone(tx);
- }
- }
-
- /**
- * @param e Error.
- */
- void onNodeLeft(ClusterTopologyCheckedException e) {
- onError(e);
- }
-
- /**
- * @param e Error.
- */
- void onError(Throwable e) {
- if (isDone()) {
- U.warn(log, "Received error when future is done [fut=" + this + ", err=" + e + ", tx=" + tx + ']');
-
- return;
- }
-
- if (log.isDebugEnabled())
- log.debug("Error on tx prepare [fut=" + this + ", err=" + e + ", tx=" + tx + ']');
-
- if (err.compareAndSet(null, e))
- tx.setRollbackOnly();
-
- onDone(e);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index df7a65f..581c7e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -301,10 +301,14 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
req.isInvalidate(),
req.timeout(),
req.txSize(),
+ req.groupLockKey(),
req.subjectId(),
req.taskNameHash()
);
+ if (req.groupLock())
+ tx.groupLockKey(txKey);
+
tx = ctx.tm().onCreated(null, tx);
if (tx == null || !ctx.tm().onStarted(tx))
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index b44f821..7b0b811 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -97,7 +97,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
@Nullable UUID subjId,
int taskNameHash) {
super(xidVer, futId, null, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer,
- committedVers, rolledbackVers, txSize);
+ committedVers, rolledbackVers, txSize, null);
this.explicitLock = explicitLock;
this.storeEnabled = storeEnabled;
@@ -170,37 +170,37 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
}
switch (writer.state()) {
- case 19:
+ case 20:
if (!writer.writeBoolean("explicitLock", explicitLock))
return false;
writer.incrementState();
- case 20:
+ case 21:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 21:
+ case 22:
if (!writer.writeBoolean("storeEnabled", storeEnabled))
return false;
writer.incrementState();
- case 22:
+ case 23:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 23:
+ case 24:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 24:
+ case 25:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -222,7 +222,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
return false;
switch (reader.state()) {
- case 19:
+ case 20:
explicitLock = reader.readBoolean("explicitLock");
if (!reader.isLastRead())
@@ -230,7 +230,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 20:
+ case 21:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -238,7 +238,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 21:
+ case 22:
storeEnabled = reader.readBoolean("storeEnabled");
if (!reader.isLastRead())
@@ -246,7 +246,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 22:
+ case 23:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -254,7 +254,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 23:
+ case 24:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -262,7 +262,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 24:
+ case 25:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -282,7 +282,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 25;
+ return 26;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 5c426ed..c665354 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -61,7 +61,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** Future. */
@GridToStringExclude
- private final AtomicReference<IgniteInternalFuture<?>> prepFut = new AtomicReference<>();
+ private final AtomicReference<IgniteInternalFuture<IgniteInternalTx>> prepFut =
+ new AtomicReference<>();
/** */
@GridToStringExclude
@@ -102,6 +103,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @param timeout Timeout.
* @param storeEnabled Store enabled flag.
* @param txSize Transaction size.
+ * @param grpLockKey Group lock key if this is a group lock transaction.
+ * @param partLock {@code True} if this is a group-lock transaction and the whole partition should be locked.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
*/
@@ -116,6 +119,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
long timeout,
boolean storeEnabled,
int txSize,
+ @Nullable IgniteTxKey grpLockKey,
+ boolean partLock,
@Nullable UUID subjId,
int taskNameHash
) {
@@ -133,6 +138,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
false,
storeEnabled,
txSize,
+ grpLockKey,
+ partLock,
subjId,
taskNameHash);
@@ -266,6 +273,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> optimisticLockEntries() {
+ if (groupLock())
+ return super.optimisticLockEntries();
+
return optimisticLockEntries;
}
@@ -407,6 +417,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
+ /** {@inheritDoc} */
+ @Override protected void addGroupTxMapping(Collection<IgniteTxKey> keys) {
+ super.addGroupTxMapping(keys);
+
+ addKeyMapping(cctx.localNode(), keys);
+ }
+
/**
* Adds key mapping to dht mapping.
*
@@ -546,7 +563,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers)
{
- Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes());
+ Collection<IgniteTxEntry> entries = groupLock() ?
+ Collections.singletonList(groupLockEntry()) :
+ F.concat(false, mapping.reads(), mapping.writes());
for (IgniteTxEntry txEntry : entries) {
while (true) {
@@ -663,13 +682,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> prepareAsync() {
- GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut.get();
+ @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+ GridNearTxPrepareFuture fut = (GridNearTxPrepareFuture)prepFut.get();
if (fut == null) {
// Future must be created before any exception can be thrown.
- fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, this) :
- new GridNearPessimisticTxPrepareFuture(cctx, this);
+ fut = new GridNearTxPrepareFuture<>(cctx, this);
if (!prepFut.compareAndSet(null, fut))
return prepFut.get();
@@ -680,7 +698,41 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
mapExplicitLocks();
- fut.prepare();
+ // For pessimistic mode we don't distribute prepare request and do not lock topology version
+ // as it was fixed on first lock.
+ if (pessimistic()) {
+ if (!state(PREPARING)) {
+ if (setRollbackOnly()) {
+ if (timedOut())
+ fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was " +
+ "rolled back: " + this));
+ else
+ fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" +
+ state() + ", tx=" + this + ']'));
+ }
+ else
+ fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " +
+ "[state=" + state() + ", tx=" + this + ']'));
+
+ return fut;
+ }
+
+ try {
+ userPrepare();
+
+ // Make sure to add future before calling prepare.
+ cctx.mvcc().addFuture(fut);
+
+ fut.prepare();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+ }
+ else {
+ // In optimistic mode we must wait for topology map update.
+ fut.prepare();
+ }
return fut;
}
@@ -700,10 +752,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<?> prepareFut = prepFut.get();
+ IgniteInternalFuture<IgniteInternalTx> prepareFut = prepFut.get();
- prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
+ prepareFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
GridNearTxFinishFuture fut0 = commitFut.get();
try {
@@ -747,7 +799,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<?> prepFut = this.prepFut.get();
+ IgniteInternalFuture<IgniteInternalTx> prepFut = this.prepFut.get();
if (prepFut == null || prepFut.isDone()) {
try {
@@ -771,8 +823,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
else {
- prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
+ prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
try {
// Check for errors in prepare future.
f.get();
@@ -815,11 +867,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @return Future that will be completed when locks are acquired.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
+ public IgniteInternalFuture<IgniteInternalTx> prepareAsyncLocal(
@Nullable Collection<IgniteTxEntry> reads,
@Nullable Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes, boolean last,
- Collection<UUID> lastBackups
+ Collection<UUID> lastBackups,
+ IgniteInClosure<GridNearTxPrepareResponse> completeCb
) {
if (state() != PREPARING) {
if (timedOut())
@@ -834,14 +887,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
init();
- GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture(
+ GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture<>(
cctx,
this,
IgniteUuid.randomUuid(),
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
last,
needReturnValue() && implicit(),
- lastBackups);
+ lastBackups,
+ completeCb);
try {
// At this point all the entries passed in must be enlisted in transaction because this is an
@@ -880,7 +934,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
- return chainOnePhasePrepare(fut);
+ return fut;
}
/**
@@ -896,7 +950,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (pessimistic())
prepareAsync();
- IgniteInternalFuture<?> prep = prepFut.get();
+ IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get();
// Do not create finish future if there are no remote nodes.
if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
@@ -932,8 +986,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
else
- prep.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
+ prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
try {
f.get(); // Check for errors of a parent future.
@@ -969,7 +1023,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<?> prep = prepFut.get();
+ IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get();
if (prep == null || prep.isDone()) {
try {
@@ -985,8 +1039,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
fut.finish();
}
else
- prep.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
+ prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
try {
f.get(); // Check for errors of a parent future.
}
@@ -1179,7 +1233,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
+ @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
return prepFut.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
new file mode 100644
index 0000000..f573187
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -0,0 +1,1050 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import javax.cache.expiry.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+import static org.apache.ignite.transactions.TransactionState.*;
+
+/**
+ *
+ */
+public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
+ implements GridCacheMvccFuture<IgniteInternalTx> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Context. */
+ private GridCacheSharedContext<K, V> cctx;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Transaction. */
+ @GridToStringInclude
+ private GridNearTxLocal tx;
+
+ /** Error. */
+ @GridToStringExclude
+ private AtomicReference<Throwable> err = new AtomicReference<>(null);
+
+ /** Trackable flag. */
+ private boolean trackable = true;
+
+ /** Full information about transaction nodes mapping. */
+ private GridDhtTxMapping<K, V> txMapping;
+
+ /** */
+ private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ */
+ public GridNearTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridNearTxLocal tx) {
+ super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
+ @Override public boolean collect(IgniteInternalTx e) {
+ return true;
+ }
+
+ @Override public IgniteInternalTx reduce() {
+ // Nothing to aggregate.
+ return tx;
+ }
+ });
+
+ assert cctx != null;
+ assert tx != null;
+
+ this.cctx = cctx;
+ this.tx = tx;
+
+ futId = IgniteUuid.randomUuid();
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFuture.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return tx.xidVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+ if (log.isDebugEnabled())
+ log.debug("Transaction future received owner changed callback: " + entry);
+
+ if (tx.optimistic()) {
+ if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) {
+ lockKeys.remove(entry.txKey());
+
+ // This will check for locks.
+ onDone();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @return Involved nodes.
+ */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return
+ F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+ @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+ if (isMini(f))
+ return ((MiniFuture)f).node();
+
+ return cctx.discovery().localNode();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ trackable = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ boolean found = false;
+
+ for (IgniteInternalFuture<?> fut : futures())
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.node().id().equals(nodeId)) {
+ f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+
+ found = true;
+ }
+ }
+
+ return found;
+ }
+
+ /**
+ * @param nodeId Failed node ID.
+ * @param mappings Remaining mappings.
+ * @param e Error.
+ */
+ void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
+ if (err.compareAndSet(null, e)) {
+ boolean marked = tx.setRollbackOnly();
+
+ if (e instanceof IgniteTxOptimisticCheckedException) {
+ assert nodeId != null : "Missing node ID for optimistic failure exception: " + e;
+
+ tx.removeKeysMapping(nodeId, mappings);
+ }
+
+ if (e instanceof IgniteTxRollbackCheckedException) {
+ if (marked) {
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
+ }
+ }
+ }
+
+ onComplete();
+ }
+ }
+
+ /**
+ * @return {@code True} if all locks are owned.
+ */
+ private boolean checkLocks() {
+ boolean locked = lockKeys.isEmpty();
+
+ if (locked) {
+ if (log.isDebugEnabled())
+ log.debug("All locks are acquired for near prepare future: " + this);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
+ }
+
+ return locked;
+ }
+
+ /**
+ * @param e Error.
+ */
+ void onError(Throwable e) {
+ onError(null, null, e);
+ }
+
+ /**
+ * @param nodeId Sender.
+ * @param res Result.
+ */
+ public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ if (!isDone()) {
+ for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
+
+ f.onResult(nodeId, res);
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
+ // If locks were not acquired yet, delay completion.
+ if (isDone() || (err == null && !checkLocks()))
+ return false;
+
+ this.err.compareAndSet(null, err);
+
+ if (err == null)
+ tx.state(PREPARED);
+
+ if (super.onDone(tx, err)) {
+ // Don't forget to clean up.
+ cctx.mvcc().removeFuture(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param f Future.
+ * @return {@code True} if mini-future.
+ */
+ private boolean isMini(IgniteInternalFuture<?> f) {
+ return f.getClass().equals(MiniFuture.class);
+ }
+
+ /**
+ * Completeness callback.
+ */
+ private void onComplete() {
+ if (super.onDone(tx, err.get()))
+ // Don't forget to clean up.
+ cctx.mvcc().removeFuture(this);
+ }
+
+ /**
+ * Completes this future.
+ */
+ void complete() {
+ onComplete();
+ }
+
+ /**
+ * Waits for topology exchange future to be ready and then prepares user transaction.
+ */
+ public void prepare() {
+ if (tx.optimistic()) {
+ // Obtain the topology version to use.
+ AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+ if (topVer != null) {
+ tx.topologyVersion(topVer);
+
+ prepare0();
+
+ return;
+ }
+
+ prepareOnTopology();
+
+ }
+ else
+ preparePessimistic();
+ }
+
+ /**
+ *
+ */
+ private void prepareOnTopology() {
+ GridDhtTopologyFuture topFut = topologyReadLock();
+
+ try {
+ if (topFut == null) {
+ assert isDone();
+
+ return;
+ }
+
+ if (topFut.isDone()) {
+ StringBuilder invalidCaches = new StringBuilder();
+ Boolean cacheInvalid = false;
+ for (GridCacheContext ctx : cctx.cacheContexts()) {
+ if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) {
+ if (cacheInvalid)
+ invalidCaches.append(", ");
+
+ invalidCaches.append(U.maskName(ctx.name()));
+
+ cacheInvalid = true;
+ }
+ }
+
+ if (cacheInvalid) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+ invalidCaches.toString()));
+
+ return;
+ }
+
+ tx.topologyVersion(topFut.topologyVersion());
+
+ prepare0();
+ }
+ else {
+ topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override
+ public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+ @Override
+ public void run() {
+ prepareOnTopology();
+ }
+ });
+ }
+ });
+ }
+ }
+ finally {
+ topologyReadUnlock();
+ }
+ }
+
+ /**
+ * Acquires topology read lock.
+ *
+ * @return Topology ready future.
+ */
+ private GridDhtTopologyFuture topologyReadLock() {
+ if (tx.activeCacheIds().isEmpty())
+ return cctx.exchange().lastTopologyFuture();
+
+ GridCacheContext<K, V> nonLocCtx = null;
+
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (!cacheCtx.isLocal()) {
+ nonLocCtx = cacheCtx;
+
+ break;
+ }
+ }
+
+ if (nonLocCtx == null)
+ return cctx.exchange().lastTopologyFuture();
+
+ nonLocCtx.topology().readLock();
+
+ if (nonLocCtx.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ nonLocCtx.name()));
+
+ return null;
+ }
+
+ return nonLocCtx.topology().topologyVersionFuture();
+ }
+
+ /**
+ * Releases topology read lock.
+ */
+ private void topologyReadUnlock() {
+ if (!tx.activeCacheIds().isEmpty()) {
+ GridCacheContext<K, V> nonLocalCtx = null;
+
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (!cacheCtx.isLocal()) {
+ nonLocalCtx = cacheCtx;
+
+ break;
+ }
+ }
+
+ if (nonLocalCtx != null)
+ nonLocalCtx.topology().readUnlock();
+ }
+ }
+
+ /**
+ * Initializes future.
+ */
+ private void prepare0() {
+ assert tx.optimistic();
+
+ try {
+ if (!tx.state(PREPARING)) {
+ if (tx.setRollbackOnly()) {
+ if (tx.timedOut())
+ onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+ "was rolled back: " + this));
+ else
+ onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
+ "[state=" + tx.state() + ", tx=" + this + ']'));
+ }
+ else
+ onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+ "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+
+ return;
+ }
+
+ // Make sure to add future before calling prepare.
+ cctx.mvcc().addFuture(this);
+
+ prepare(
+ tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
+ tx.writeEntries());
+
+ markInitialized();
+ }
+ catch (TransactionTimeoutException | TransactionOptimisticException e) {
+ onError(cctx.localNodeId(), null, e);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+
+ /**
+ * @param reads Read entries.
+ * @param writes Write entries.
+ * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node.
+ */
+ private void prepare(
+ Iterable<IgniteTxEntry> reads,
+ Iterable<IgniteTxEntry> writes
+ ) throws IgniteCheckedException {
+ assert tx.optimistic();
+
+ AffinityTopologyVersion topVer = tx.topologyVersion();
+
+ assert topVer.topologyVersion() > 0;
+
+ txMapping = new GridDhtTxMapping<>();
+
+ ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings =
+ new ConcurrentLinkedDeque8<>();
+
+ if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
+ onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " +
+ "partition nodes left the grid): " + cacheCtx.name()));
+
+ return;
+ }
+ }
+ }
+
+ // Assign keys to primary nodes.
+ GridDistributedTxMapping cur = null;
+
+ for (IgniteTxEntry read : reads) {
+ GridDistributedTxMapping updated = map(read, topVer, cur, false);
+
+ if (cur != updated) {
+ mappings.offer(updated);
+
+ if (updated.node().isLocal()) {
+ if (read.context().isNear())
+ tx.nearLocallyMapped(true);
+ else if (read.context().isColocated())
+ tx.colocatedLocallyMapped(true);
+ }
+
+ cur = updated;
+ }
+ }
+
+ for (IgniteTxEntry write : writes) {
+ GridDistributedTxMapping updated = map(write, topVer, cur, true);
+
+ if (cur != updated) {
+ mappings.offer(updated);
+
+ if (updated.node().isLocal()) {
+ if (write.context().isNear())
+ tx.nearLocallyMapped(true);
+ else if (write.context().isColocated())
+ tx.colocatedLocallyMapped(true);
+ }
+
+ cur = updated;
+ }
+ }
+
+ if (isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Abandoning (re)map because future is done: " + this);
+
+ return;
+ }
+
+ tx.addEntryMapping(mappings);
+
+ cctx.mvcc().recheckPendingLocks();
+
+ txMapping.initLast(mappings);
+
+ tx.transactionNodes(txMapping.transactionNodes());
+
+ checkOnePhase();
+
+ proceedPrepare(mappings);
+ }
+
+ /**
+ *
+ */
+ private void preparePessimistic() {
+ Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+
+ AffinityTopologyVersion topVer = tx.topologyVersion();
+
+ txMapping = new GridDhtTxMapping<>();
+
+ for (IgniteTxEntry txEntry : tx.allEntries()) {
+ GridCacheContext cacheCtx = txEntry.context();
+
+ List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
+
+ ClusterNode primary = F.first(nodes);
+
+ boolean near = cacheCtx.isNear();
+
+ IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);
+
+ GridDistributedTxMapping nodeMapping = mappings.get(key);
+
+ if (nodeMapping == null) {
+ nodeMapping = new GridDistributedTxMapping(primary);
+
+ nodeMapping.near(cacheCtx.isNear());
+
+ mappings.put(key, nodeMapping);
+ }
+
+ txEntry.nodeId(primary.id());
+
+ nodeMapping.add(txEntry);
+
+ txMapping.addMapping(nodes);
+ }
+
+ tx.transactionNodes(txMapping.transactionNodes());
+
+ checkOnePhase();
+
+ for (final GridDistributedTxMapping m : mappings.values()) {
+ final ClusterNode node = m.node();
+
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ m.reads(),
+ m.writes(),
+ /*grp lock key*/null,
+ /*part lock*/false,
+ m.near(),
+ txMapping.transactionNodes(),
+ true,
+ txMapping.transactionNodes().get(node.id()),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash());
+
+ for (IgniteTxEntry txEntry : m.writes()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ final MiniFuture fut = new MiniFuture(m, null);
+
+ req.miniId(fut.futureId());
+
+ add(fut);
+
+ if (node.isLocal()) {
+ cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+ @Override public void apply(GridNearTxPrepareResponse res) {
+ fut.onResult(node.id(), res);
+ }
+ });
+ }
+ else {
+ try {
+ cctx.io().send(node, req, tx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ // Fail the whole thing.
+ fut.onResult(e);
+ }
+ }
+ }
+
+ markInitialized();
+ }
+
+ /**
+ * Checks if mapped transaction can be committed on one phase.
+ * One-phase commit can be done if transaction maps to one primary node and not more than one backup.
+ */
+ private void checkOnePhase() {
+ if (tx.storeUsed())
+ return;
+
+ Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();
+
+ if (map.size() == 1) {
+ Map.Entry<UUID, Collection<UUID>> entry = F.firstEntry(map);
+
+ assert entry != null;
+
+ Collection<UUID> backups = entry.getValue();
+
+ if (backups.size() <= 1)
+ tx.onePhaseCommit(true);
+ }
+ }
+
+ /**
+ * Continues prepare after previous mapping successfully finished.
+ *
+ * @param mappings Queue of mappings.
+ */
+ private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
+ if (isDone())
+ return;
+
+ final GridDistributedTxMapping m = mappings.poll();
+
+ if (m == null)
+ return;
+
+ assert !m.empty();
+
+ final ClusterNode n = m.node();
+
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ tx.optimistic() && tx.serializable() ? m.reads() : null,
+ m.writes(),
+ tx.groupLockKey(),
+ tx.partitionLock(),
+ m.near(),
+ txMapping.transactionNodes(),
+ m.last(),
+ m.lastBackups(),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash());
+
+ for (IgniteTxEntry txEntry : m.writes()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ // Must lock near entries separately.
+ if (m.near()) {
+ try {
+ tx.optimisticLockEntries(req.writes());
+
+ tx.userPrepare();
+ }
+ catch (IgniteCheckedException e) {
+ onError(null, null, e);
+ }
+ }
+
+ final MiniFuture fut = new MiniFuture(m, mappings);
+
+ req.miniId(fut.futureId());
+
+ add(fut); // Append new future.
+
+ // If this is the primary node for the keys.
+ if (n.isLocal()) {
+ // At this point, if any new node joined, then it is
+ // waiting for this transaction to complete, so
+ // partition reassignments are not possible here.
+ cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+ @Override public void apply(GridNearTxPrepareResponse res) {
+ fut.onResult(n.id(), res);
+ }
+ });
+ }
+ else {
+ assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx +
+ ", nodeId=" + n.id() + ']';
+
+ try {
+ cctx.io().send(n, req, tx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ // Fail the whole thing.
+ fut.onResult(e);
+ }
+ }
+ }
+
+ /**
+ * @param entry Transaction entry.
+ * @param topVer Topology version.
+ * @param cur Current mapping.
+ * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
+ * @return Mapping.
+ */
+ private GridDistributedTxMapping map(
+ IgniteTxEntry entry,
+ AffinityTopologyVersion topVer,
+ GridDistributedTxMapping cur,
+ boolean waitLock
+ ) throws IgniteCheckedException {
+ GridCacheContext cacheCtx = entry.context();
+
+ List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+
+ txMapping.addMapping(nodes);
+
+ ClusterNode primary = F.first(nodes);
+
+ assert primary != null;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Mapped key to primary node [key=" + entry.key() +
+ ", part=" + cacheCtx.affinity().partition(entry.key()) +
+ ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
+ }
+
+ if (tx.groupLock() && !primary.isLocal())
+ throw new IgniteCheckedException("Failed to prepare group lock transaction (local node is not primary for " +
+ " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() + ']');
+
+ // Must re-initialize cached entry while holding topology lock.
+ if (cacheCtx.isNear())
+ entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
+ else if (!cacheCtx.isLocal())
+ entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
+ else
+ entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
+
+ if (cacheCtx.isNear() || cacheCtx.isLocal()) {
+ if (waitLock && entry.explicitVersion() == null) {
+ if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey()))
+ lockKeys.add(entry.txKey());
+ }
+ }
+
+ if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+ cur = new GridDistributedTxMapping(primary);
+
+ // Initialize near flag right away.
+ cur.near(cacheCtx.isNear());
+ }
+
+ cur.add(entry);
+
+ if (entry.explicitVersion() != null) {
+ tx.markExplicit(primary.id());
+
+ cur.markExplicitLock();
+ }
+
+ entry.nodeId(primary.id());
+
+ if (cacheCtx.isNear()) {
+ while (true) {
+ try {
+ GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
+
+ cached.dhtNodeId(tx.xidVersion(), primary.id());
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ entry.cached(cacheCtx.near().entryEx(entry.key()));
+ }
+ }
+ }
+
+ return cur;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearTxPrepareFuture.class, this, super.toString());
+ }
+
+ /**
+ * Mini-future for get operations. Mini-futures are only waiting on a single
+ * node as opposed to multiple nodes.
+ */
+ private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Keys. */
+ @GridToStringInclude
+ private GridDistributedTxMapping m;
+
+ /** Flag to signal some result being processed. */
+ private AtomicBoolean rcvRes = new AtomicBoolean(false);
+
+ /** Mappings to proceed prepare. */
+ private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
+
+ /**
+ * @param m Mapping.
+ * @param mappings Queue of mappings to proceed with.
+ */
+ MiniFuture(
+ GridDistributedTxMapping m,
+ ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
+ ) {
+ this.m = m;
+ this.mappings = mappings;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public ClusterNode node() {
+ return m.node();
+ }
+
+ /**
+ * @return Keys.
+ */
+ public GridDistributedTxMapping mapping() {
+ return m;
+ }
+
+ /**
+ * @param e Error.
+ */
+ void onResult(Throwable e) {
+ if (rcvRes.compareAndSet(false, true)) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+ // Fail.
+ onDone(e);
+ }
+ else
+ U.warn(log, "Received error after another result has been processed [fut=" +
+ GridNearTxPrepareFuture.this + ", mini=" + this + ']', e);
+ }
+
+ /**
+ * @param e Node failure.
+ */
+ void onResult(ClusterTopologyCheckedException e) {
+ if (isDone())
+ return;
+
+ if (rcvRes.compareAndSet(false, true)) {
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
+
+ // Fail the whole future (make sure not to remap on different primary node
+ // to prevent multiple lock coordinators).
+ onError(null, null, e);
+ }
+ }
+
+ /**
+ * @param nodeId Failed node ID.
+ * @param res Result callback.
+ */
+ void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ if (isDone())
+ return;
+
+ if (rcvRes.compareAndSet(false, true)) {
+ if (res.error() != null) {
+ // Fail the whole compound future.
+ onError(nodeId, mappings, res.error());
+ }
+ else {
+ assert F.isEmpty(res.invalidPartitions());
+
+ for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
+ IgniteTxEntry txEntry = tx.entry(entry.getKey());
+
+ assert txEntry != null;
+
+ GridCacheContext cacheCtx = txEntry.context();
+
+ while (true) {
+ try {
+ if (cacheCtx.isNear()) {
+ GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();
+
+ CacheVersionedValue tup = entry.getValue();
+
+ nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
+ tup.version(), m.node().id(), tx.topologyVersion());
+ }
+ else if (txEntry.cached().detached()) {
+ GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
+
+ CacheVersionedValue tup = entry.getValue();
+
+ detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion());
+ }
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // Retry.
+ }
+ catch (IgniteCheckedException e) {
+ // Fail the whole compound future.
+ onError(nodeId, mappings, e);
+
+ return;
+ }
+ }
+ }
+
+ tx.implicitSingleResult(res.returnValue());
+
+ for (IgniteTxKey key : res.filterFailedKeys()) {
+ IgniteTxEntry txEntry = tx.entry(key);
+
+ assert txEntry != null : "Missing tx entry for write key: " + key;
+
+ txEntry.op(NOOP);
+
+ assert txEntry.context() != null;
+
+ ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
+
+ if (expiry != null)
+ txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
+ }
+
+ if (!m.empty()) {
+ // Register DHT version.
+ tx.addDhtVersion(m.node().id(), res.dhtVersion());
+
+ m.dhtVersion(res.dhtVersion());
+
+ if (m.near())
+ tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+ }
+
+ // Proceed prepare before finishing mini future.
+ if (mappings != null)
+ proceedPrepare(mappings);
+
+ // Finish this mini future.
+ onDone(tx);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ }
+ }
+}