You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 15:26:46 UTC
[39/49] ignite git commit: ignite-1607 Implemented deadlock-free
optimistic serializable tx mode
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 6db00ab..af43113 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,17 +35,14 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
-import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -57,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.transactions.TransactionOptimisticException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -69,7 +64,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
/**
*
*/
-public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter
+public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
implements GridCacheMvccFuture<IgniteInternalTx> {
/** */
@GridToStringInclude
@@ -82,7 +77,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
super(cctx, tx);
- assert tx.optimistic() : tx;
+ assert tx.optimistic() && !tx.serializable() : tx;
}
/** {@inheritDoc} */
@@ -139,11 +134,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
/**
- * @param nodeId Failed node ID.
- * @param mappings Remaining mappings.
* @param e Error.
*/
- void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
+ void onError(Throwable e) {
if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
if (tx.onePhaseCommit()) {
tx.markForBackupCheck();
@@ -157,12 +150,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
if (err.compareAndSet(null, e)) {
boolean marked = tx.setRollbackOnly();
- if (e instanceof IgniteTxOptimisticCheckedException) {
- assert nodeId != null : "Missing node ID for optimistic failure exception: " + e;
-
- tx.removeKeysMapping(nodeId, mappings);
- }
-
if (e instanceof IgniteTxRollbackCheckedException) {
if (marked) {
try {
@@ -199,7 +186,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
- for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+ for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
@@ -253,212 +240,38 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
return false;
}
- /** {@inheritDoc} */
- @Override public void prepare() {
- // Obtain the topology version to use.
- AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
-
- // If there is another system transaction in progress, use it's topology version to prevent deadlock.
- if (topVer == null && tx != null && tx.system()) {
- IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
-
- if (tx0 != null)
- topVer = tx0.topologyVersionSnapshot();
- }
-
- if (topVer != null) {
- tx.topologyVersion(topVer);
-
- cctx.mvcc().addFuture(this);
-
- prepare0(false, true);
-
- return;
- }
-
- prepareOnTopology(false, null);
- }
-
- /**
- * @param remap Remap flag.
- * @param c Optional closure to run after map.
- */
- private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) {
- GridDhtTopologyFuture topFut = topologyReadLock();
-
- AffinityTopologyVersion topVer = null;
-
- try {
- if (topFut == null) {
- assert isDone();
-
- return;
- }
-
- if (topFut.isDone()) {
- topVer = topFut.topologyVersion();
-
- if (remap)
- tx.onRemap(topVer);
- else
- tx.topologyVersion(topVer);
-
- if (!remap)
- cctx.mvcc().addFuture(this);
- }
- }
- finally {
- topologyReadUnlock();
- }
-
- if (topVer != null) {
- StringBuilder invalidCaches = null;
-
- for (Integer cacheId : tx.activeCacheIds()) {
- GridCacheContext ctx = cctx.cacheContext(cacheId);
-
- assert ctx != null : cacheId;
-
- Throwable err = topFut.validateCache(ctx);
-
- if (err != null) {
- if (invalidCaches != null)
- invalidCaches.append(", ");
- else
- invalidCaches = new StringBuilder();
-
- invalidCaches.append(U.maskName(ctx.name()));
- }
- }
-
- if (invalidCaches != null) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- invalidCaches.toString()));
-
- return;
- }
-
- prepare0(remap, false);
-
- if (c != null)
- c.run();
- }
- else {
- topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
- @Override public void run() {
- try {
- fut.get();
-
- prepareOnTopology(remap, c);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- finally {
- cctx.txContextReset();
- }
- }
- });
- }
- });
- }
- }
-
- /**
- * Acquires topology read lock.
- *
- * @return Topology ready future.
- */
- private GridDhtTopologyFuture topologyReadLock() {
- if (tx.activeCacheIds().isEmpty())
- return cctx.exchange().lastTopologyFuture();
-
- GridCacheContext<?, ?> nonLocCtx = null;
-
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- if (!cacheCtx.isLocal()) {
- nonLocCtx = cacheCtx;
-
- break;
- }
- }
-
- if (nonLocCtx == null)
- return cctx.exchange().lastTopologyFuture();
-
- nonLocCtx.topology().readLock();
-
- if (nonLocCtx.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- nonLocCtx.name()));
-
- return null;
- }
-
- return nonLocCtx.topology().topologyVersionFuture();
- }
-
- /**
- * Releases topology read lock.
- */
- private void topologyReadUnlock() {
- if (!tx.activeCacheIds().isEmpty()) {
- GridCacheContext<?, ?> nonLocCtx = null;
-
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- if (!cacheCtx.isLocal()) {
- nonLocCtx = cacheCtx;
-
- break;
- }
- }
-
- if (nonLocCtx != null)
- nonLocCtx.topology().readUnlock();
- }
- }
-
/**
* Initializes future.
*
* @param remap Remap flag.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
- private void prepare0(boolean remap, boolean topLocked) {
+ @Override protected void prepare0(boolean remap, boolean topLocked) {
try {
boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
if (!txStateCheck) {
if (tx.setRollbackOnly()) {
if (tx.timedOut())
- onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+ onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " +
"was rolled back: " + this));
else
- onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
+ onError(new IgniteCheckedException("Invalid transaction state for prepare " +
"[state=" + tx.state() + ", tx=" + this + ']'));
}
else
- onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+ onError(new IgniteTxRollbackCheckedException("Invalid transaction state for " +
"prepare [state=" + tx.state() + ", tx=" + this + ']'));
return;
}
- prepare(
- tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
- tx.writeEntries(),
- topLocked);
+ prepare(tx.writeEntries(), topLocked);
markInitialized();
}
- catch (TransactionTimeoutException | TransactionOptimisticException e) {
- onError(cctx.localNodeId(), null, e);
+ catch (TransactionTimeoutException e) {
+ onError( e);
}
catch (IgniteCheckedException e) {
onDone(e);
@@ -466,13 +279,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
/**
- * @param reads Read entries.
* @param writes Write entries.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
* @throws IgniteCheckedException If failed.
*/
private void prepare(
- Iterable<IgniteTxEntry> reads,
Iterable<IgniteTxEntry> writes,
boolean topLocked
) throws IgniteCheckedException {
@@ -484,7 +295,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
- if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+ if (!F.isEmpty(writes)) {
for (int cacheId : tx.activeCacheIds()) {
GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
@@ -500,25 +311,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
// Assign keys to primary nodes.
GridDistributedTxMapping cur = null;
- for (IgniteTxEntry read : reads) {
- GridDistributedTxMapping updated = map(read, topVer, cur, false, topLocked);
-
- if (cur != updated) {
- mappings.offer(updated);
-
- if (updated.node().isLocal()) {
- if (read.context().isNear())
- tx.nearLocallyMapped(true);
- else if (read.context().isColocated())
- tx.colocatedLocallyMapped(true);
- }
-
- cur = updated;
- }
- }
-
for (IgniteTxEntry write : writes) {
- GridDistributedTxMapping updated = map(write, topVer, cur, true, topLocked);
+ GridDistributedTxMapping updated = map(write, topVer, cur, topLocked);
if (cur != updated) {
mappings.offer(updated);
@@ -576,7 +370,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
futId,
tx.topologyVersion(),
tx,
- tx.optimistic() && tx.serializable() ? m.reads() : null,
+ null,
m.writes(),
m.near(),
txMapping.transactionNodes(),
@@ -604,7 +398,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
tx.userPrepare();
}
catch (IgniteCheckedException e) {
- onError(null, null, e);
+ onError(e);
}
}
@@ -651,7 +445,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
* @param entry Transaction entry.
* @param topVer Topology version.
* @param cur Current mapping.
- * @param waitLock Wait lock flag.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
* @return Mapping.
*/
@@ -659,7 +452,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
IgniteTxEntry entry,
AffinityTopologyVersion topVer,
@Nullable GridDistributedTxMapping cur,
- boolean waitLock,
boolean topLocked
) {
GridCacheContext cacheCtx = entry.context();
@@ -687,7 +479,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
if (cacheCtx.isNear() || cacheCtx.isLocal()) {
- if (waitLock && entry.explicitVersion() == null)
+ if (entry.explicitVersion() == null)
lockKeys.add(entry.txKey());
}
@@ -749,7 +541,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
*
*/
- private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+ private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
/** */
private static final long serialVersionUID = 0L;
@@ -828,7 +620,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
// Fail the whole future (make sure not to remap on different primary node
// to prevent multiple lock coordinators).
- onError(null, null, e);
+ onError(e);
}
}
@@ -836,14 +628,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
* @param nodeId Failed node ID.
* @param res Result callback.
*/
- void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ void onResult(UUID nodeId, final GridNearTxPrepareResponse res) {
if (isDone())
return;
if (rcvRes.compareAndSet(false, true)) {
if (res.error() != null) {
// Fail the whole compound future.
- onError(nodeId, mappings, res.error());
+ onError(res.error());
}
else {
if (res.clientRemapVersion() != null) {
@@ -877,7 +669,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
proceedPrepare(mappings);
// Finish this mini future.
- onDone(tx);
+ onDone((GridNearTxPrepareResponse)null);
}
}
}
@@ -889,7 +681,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
private void remap() {
prepareOnTopology(true, new Runnable() {
@Override public void run() {
- onDone(tx);
+ onDone((GridNearTxPrepareResponse)null);
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
new file mode 100644
index 0000000..fd9183e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearTxPrepareFutureAdapter {
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ */
+ public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+ super(cctx, tx);
+
+ assert tx.optimistic() : tx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void prepare() {
+ // Obtain the topology version to use.
+ AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+ // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+ if (topVer == null && tx != null && tx.system()) {
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+ if (tx0 != null)
+ topVer = tx0.topologyVersionSnapshot();
+ }
+
+ if (topVer != null) {
+ tx.topologyVersion(topVer);
+
+ cctx.mvcc().addFuture(this);
+
+ prepare0(false, true);
+
+ return;
+ }
+
+ prepareOnTopology(false, null);
+ }
+
+ /**
+ * Acquires topology read lock.
+ *
+ * @return Topology ready future.
+ */
+ protected final GridDhtTopologyFuture topologyReadLock() {
+ if (tx.activeCacheIds().isEmpty())
+ return cctx.exchange().lastTopologyFuture();
+
+ GridCacheContext<?, ?> nonLocCtx = null;
+
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (!cacheCtx.isLocal()) {
+ nonLocCtx = cacheCtx;
+
+ break;
+ }
+ }
+
+ if (nonLocCtx == null)
+ return cctx.exchange().lastTopologyFuture();
+
+ nonLocCtx.topology().readLock();
+
+ if (nonLocCtx.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ nonLocCtx.name()));
+
+ return null;
+ }
+
+ return nonLocCtx.topology().topologyVersionFuture();
+ }
+
+ /**
+ * Releases topology read lock.
+ */
+ protected final void topologyReadUnlock() {
+ if (!tx.activeCacheIds().isEmpty()) {
+ GridCacheContext<?, ?> nonLocCtx = null;
+
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (!cacheCtx.isLocal()) {
+ nonLocCtx = cacheCtx;
+
+ break;
+ }
+ }
+
+ if (nonLocCtx != null)
+ nonLocCtx.topology().readUnlock();
+ }
+ }
+
+ /**
+ * @param remap Remap flag.
+ * @param c Optional closure to run after map.
+ */
+ protected final void prepareOnTopology(final boolean remap, @Nullable final Runnable c) {
+ GridDhtTopologyFuture topFut = topologyReadLock();
+
+ AffinityTopologyVersion topVer = null;
+
+ try {
+ if (topFut == null) {
+ assert isDone();
+
+ return;
+ }
+
+ if (topFut.isDone()) {
+ topVer = topFut.topologyVersion();
+
+ if (remap)
+ tx.onRemap(topVer);
+ else
+ tx.topologyVersion(topVer);
+
+ if (!remap)
+ cctx.mvcc().addFuture(this);
+ }
+ }
+ finally {
+ topologyReadUnlock();
+ }
+
+ if (topVer != null) {
+ StringBuilder invalidCaches = null;
+
+ for (Integer cacheId : tx.activeCacheIds()) {
+ GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+ assert ctx != null : cacheId;
+
+ Throwable err = topFut.validateCache(ctx);
+
+ if (err != null) {
+ if (invalidCaches != null)
+ invalidCaches.append(", ");
+ else
+ invalidCaches = new StringBuilder();
+
+ invalidCaches.append(U.maskName(ctx.name()));
+ }
+ }
+
+ if (invalidCaches != null) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+ invalidCaches.toString()));
+
+ return;
+ }
+
+ prepare0(remap, false);
+
+ if (c != null)
+ c.run();
+ }
+ else {
+ topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+ @Override public void run() {
+ try {
+ fut.get();
+
+ prepareOnTopology(remap, c);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ finally {
+ cctx.txContextReset();
+ }
+ }
+ });
+ }
+ });
+ }
+ }
+
+ /**
+ * @param remap Remap flag.
+ * @param topLocked {@code True} if thread already acquired lock preventing topology change.
+ */
+ protected abstract void prepare0(boolean remap, boolean topLocked);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 62f9bb3..11d31b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -103,7 +103,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (!isDone()) {
assert res.clientRemapVersion() == null : res;
- for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+ for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
MiniFuture f = (MiniFuture)fut;
if (f.futureId().equals(res.miniId())) {
@@ -292,7 +292,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/**
*
*/
- private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+ private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
/** */
private static final long serialVersionUID = 0L;
@@ -332,7 +332,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
else {
onPrepareResponse(m, res);
- onDone(tx);
+ onDone(res);
}
}
@@ -344,7 +344,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
tx.markForBackupCheck();
// Do not fail future for one-phase transaction right away.
- onDone(tx);
+ onDone((GridNearTxPrepareResponse)null);
}
onError(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index c3bb324..0e8aa0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -117,7 +117,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
@Nullable final Collection<? extends K> keys,
boolean forcePrimary,
boolean skipTx,
- @Nullable final GridCacheEntryEx entry,
@Nullable UUID subjId,
String taskName,
final boolean deserializePortable,
@@ -143,7 +142,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
@Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
return tx.getAllAsync(ctx,
ctx.cacheKeysView(keys),
- entry,
deserializePortable,
skipVals,
false,
@@ -156,7 +154,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
return loadAsync(null,
ctx.cacheKeysView(keys),
- false,
forcePrimary,
subjId,
taskName,
@@ -174,6 +171,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
* @param deserializePortable Deserialize portable flag.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
+ * @param needVer If {@code true} returns values as tuples containing value and version.
* @return Future.
*/
IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx,
@@ -181,21 +179,23 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
boolean readThrough,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals) {
+ boolean skipVals,
+ boolean needVer) {
assert tx != null;
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
keys,
readThrough,
- false,
- false,
+ /*force primary*/needVer,
tx,
CU.subjectId(tx, ctx.shared()),
tx.resolveTaskName(),
deserializePortable,
expiryPlc,
skipVals,
- /*can remap*/true);
+ /*can remap*/true,
+ needVer,
+ /*keepCacheObjects*/true);
// init() will register future for responses if it has remote mappings.
fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 1a4f130..46c9f3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -266,9 +266,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
if (tx.onePhaseCommit()) {
- finishOnePhase();
+ boolean commit = this.commit && err == null;
- tx.tmFinish(commit && err == null);
+ finishOnePhase(commit);
+
+ tx.tmFinish(commit);
}
Throwable th = this.err.get();
@@ -510,9 +512,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
- *
+ * @param commit Commit flag.
*/
- private void finishOnePhase() {
+ private void finishOnePhase(boolean commit) {
// No need to send messages as transaction was already committed on remote node.
// Finish local mapping only as we need send commit message to backups.
for (GridDistributedTxMapping m : mappings.values()) {
@@ -522,6 +524,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// Add new future.
if (fut != null)
add(fut);
+
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ea96649..883c285 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -55,14 +55,15 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -342,32 +343,30 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> loadMissing(
+ @Override public IgniteInternalFuture<Void> loadMissing(
final GridCacheContext cacheCtx,
boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
- boolean deserializePortable,
boolean skipVals,
- final IgniteBiInClosure<KeyCacheObject, Object> c
+ final boolean needVer,
+ final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
) {
if (cacheCtx.isNear()) {
return cacheCtx.nearTx().txLoadAsync(this,
keys,
readThrough,
- deserializePortable,
+ /*deserializePortable*/false,
accessPolicy(cacheCtx, keys),
- skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
- @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
+ skipVals,
+ needVer).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
+ @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
try {
Map<Object, Object> map = f.get();
- // Must loop through keys, not map entries,
- // as map entries may not have all the keys.
- for (KeyCacheObject key : keys)
- c.apply(key, map.get(key.value(cacheCtx.cacheObjectContext(), false)));
+ processLoaded(map, keys, needVer, c);
- return true;
+ return null;
}
catch (Exception e) {
setRollbackOnly();
@@ -381,39 +380,73 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return cacheCtx.colocated().loadAsync(
keys,
readThrough,
- /*reload*/false,
- /*force primary*/false,
+ /*force primary*/needVer,
topologyVersion(),
CU.subjectId(this, cctx),
resolveTaskName(),
- deserializePortable,
+ /*deserializePortable*/false,
accessPolicy(cacheCtx, keys),
skipVals,
- /*can remap*/true
- ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
- @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
- try {
- Map<Object, Object> map = f.get();
-
- // Must loop through keys, not map entries,
- // as map entries may not have all the keys.
- for (KeyCacheObject key : keys)
- c.apply(key, map.get(key.value(cacheCtx.cacheObjectContext(), false)));
-
- return true;
- }
- catch (Exception e) {
- setRollbackOnly();
-
- throw new GridClosureException(e);
- }
+ /*can remap*/true,
+ needVer,
+ /*keepCacheObject*/true
+ ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
+ @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
+ try {
+ Map<Object, Object> map = f.get();
+
+ processLoaded(map, keys, needVer, c);
+
+ return null;
}
- });
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
+ }
+ });
}
else {
assert cacheCtx.isLocal();
- return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c);
+ return super.loadMissing(cacheCtx, readThrough, async, keys, skipVals, needVer, c);
+ }
+ }
+
+ /**
+ * @param map Loaded values.
+ * @param keys Keys.
+ * @param needVer If {@code true} version is required for loaded values.
+ * @param c Closure.
+ */
+ private void processLoaded(
+ Map<Object, Object> map,
+ final Collection<KeyCacheObject> keys,
+ boolean needVer,
+ GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) {
+ for (KeyCacheObject key : keys) {
+ Object val = map.get(key);
+
+ if (val != null) {
+ Object v;
+ GridCacheVersion ver;
+
+ if (needVer) {
+ T2<Object, GridCacheVersion> t = (T2)val;
+
+ v = t.get1();
+ ver = t.get2();
+ }
+ else {
+ v = val;
+ ver = null;
+ }
+
+ c.apply(key, v, ver);
+ }
+ else
+ c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER);
}
}
@@ -555,36 +588,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
-
- /**
- * Removes mapping in case of optimistic tx failure on primary node.
- *
- * @param failedNodeId Failed node ID.
- * @param mapQueue Mappings queue.
- */
- void removeKeysMapping(UUID failedNodeId, Iterable<GridDistributedTxMapping> mapQueue) {
- assert failedNodeId != null;
- assert mapQueue != null;
-
- mappings.remove(failedNodeId);
-
- if (!F.isEmpty(mapQueue)) {
- for (GridDistributedTxMapping m : mapQueue) {
- UUID nodeId = m.node().id();
-
- GridDistributedTxMapping mapping = mappings.get(nodeId);
-
- if (mapping != null) {
- for (IgniteTxEntry entry : m.entries())
- mapping.removeEntry(entry);
-
- if (mapping.entries().isEmpty())
- mappings.remove(nodeId);
- }
- }
- }
- }
-
/**
* @param nodeId Node ID to mark with explicit lock.
* @return {@code True} if mapping was found.
@@ -621,7 +624,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers)
{
- Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes());
+ Collection<IgniteTxEntry> entries = F.concat(false, mapping.writes(), mapping.reads());
for (IgniteTxEntry txEntry : entries) {
while (true) {
@@ -743,8 +746,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (fut == null) {
// Future must be created before any exception can be thrown.
- fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, this) :
- new GridNearPessimisticTxPrepareFuture(cctx, this);
+ if (optimistic()) {
+ fut = serializable() ?
+ new GridNearOptimisticSerializableTxPrepareFuture(cctx, this) :
+ new GridNearOptimisticTxPrepareFuture(cctx, this);
+ }
+ else
+ fut = new GridNearPessimisticTxPrepareFuture(cctx, this);
if (!prepFut.compareAndSet(null, fut))
return prepFut.get();
@@ -871,7 +879,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
@Nullable Collection<IgniteTxEntry> reads,
@Nullable Collection<IgniteTxEntry> writes,
- Map<UUID, Collection<UUID>> txNodes, boolean last,
+ Map<UUID, Collection<UUID>> txNodes,
+ boolean last,
Collection<UUID> lastBackups
) {
if (state() != PREPARING) {
@@ -899,7 +908,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
try {
// At this point all the entries passed in must be enlisted in transaction because this is an
// optimistic transaction.
- optimisticLockEntries = writes;
+ optimisticLockEntries = (serializable() && optimistic()) ? F.concat(false, writes, reads) : writes;
userPrepare();
@@ -1192,12 +1201,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return plc;
}
- /**
- * @param cacheCtx Cache context.
- * @param keys Keys.
- * @return Expiry policy.
- */
- private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
+ /** {@inheritDoc} */
+ @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
if (accessMap != null) {
for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key()))
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index fac7a12..45477a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -48,15 +48,15 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
/**
* Common code for tx prepare in optimistic and pessimistic modes.
*/
-public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentityFuture<IgniteInternalTx>
+public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx> {
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** */
- private static final IgniteReducer<IgniteInternalTx, IgniteInternalTx> REDUCER =
- new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
- @Override public boolean collect(IgniteInternalTx e) {
+ private static final IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx> REDUCER =
+ new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() {
+ @Override public boolean collect(GridNearTxPrepareResponse e) {
return true;
}
@@ -94,7 +94,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentit
* @param tx Transaction.
*/
public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final GridNearTxLocal tx) {
- super(cctx.kernalContext(), REDUCER);
+ super(REDUCER);
assert cctx != null;
assert tx != null;
@@ -201,6 +201,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentit
}
catch (GridCacheEntryRemovedException ignored) {
// Retry.
+ txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index cacac13..85ed881 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -67,6 +67,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
*
* @param threadId Owning thread ID.
* @param ver Lock version.
+ * @param serOrder Version for serializable transactions ordering.
+ * @param serReadVer Optional read entry version for optimistic serializable transaction.
* @param timeout Timeout to acquire lock.
* @param reenter Reentry flag.
* @param tx Transaction flag.
@@ -77,6 +79,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
@Nullable public GridCacheMvccCandidate addLocal(
long threadId,
GridCacheVersion ver,
+ @Nullable GridCacheVersion serOrder,
+ @Nullable GridCacheVersion serReadVer,
long timeout,
boolean reenter,
boolean tx,
@@ -91,6 +95,11 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
synchronized (this) {
checkObsolete();
+ if (serReadVer != null) {
+ if (!checkSerializableReadVersion(serReadVer))
+ return null;
+ }
+
GridCacheMvcc mvcc = mvccExtras();
if (mvcc == null) {
@@ -103,12 +112,16 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
cand = mvcc.addLocal(
this,
+ /*nearNodeId*/null,
+ /*nearVer*/null,
threadId,
ver,
timeout,
+ serOrder,
reenter,
tx,
- implicitSingle
+ implicitSingle,
+ /*dht-local*/false
);
owner = mvcc.localOwner();
@@ -191,10 +204,16 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
}
/** {@inheritDoc} */
- @Override public boolean tmLock(IgniteInternalTx tx, long timeout) throws GridCacheEntryRemovedException {
+ @Override public boolean tmLock(IgniteInternalTx tx,
+ long timeout,
+ @Nullable GridCacheVersion serOrder,
+ GridCacheVersion serReadVer)
+ throws GridCacheEntryRemovedException {
GridCacheMvccCandidate cand = addLocal(
tx.threadId(),
tx.xidVersion(),
+ serOrder,
+ serReadVer,
timeout,
/*reenter*/false,
/*tx*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 7018c4e..cb14b4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -225,6 +225,8 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
GridCacheMvccCandidate c = entry.addLocal(
threadId,
lockVer,
+ null,
+ null,
timeout,
!inTx(),
inTx(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 0bf6ea2..8446665 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -495,7 +495,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Nullable final Collection<? extends K> keys,
final boolean forcePrimary,
boolean skipTx,
- @Nullable final GridCacheEntryEx entry,
@Nullable UUID subjId,
final String taskName,
final boolean deserializePortable,
@@ -595,10 +594,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
catch (GridCacheEntryRemovedException ignored) {
// No-op, retry.
}
- catch (GridCacheFilterFailedException ignored) {
- // No-op, skip the key.
- break;
- }
finally {
if (entry != null)
ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
@@ -615,7 +610,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return getAllAsync(
keys,
opCtx == null || !opCtx.skipStore(),
- null,
false,
subjId,
taskName,
@@ -1284,9 +1278,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
catch (GridCacheEntryRemovedException ignore) {
assert false : "Entry cannot become obsolete while holding lock.";
}
- catch (GridCacheFilterFailedException ignore) {
- assert false : "Filter should never fail with failFast=false and empty filter.";
- }
}
// Store final batch.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index c0c2284..716676f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -147,12 +147,6 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
cctx.kernalContext().gateway().readLock();
try {
- TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration();
-
- if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE)
- throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " +
- "'txSerializableEnabled' configuration property)");
-
IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx);
if (tx != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 7d7e3e8..1c82636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -325,7 +325,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
threadId = Thread.currentThread().getId();
- log = U.logger(cctx.kernalContext(), logRef, this);
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, this);
}
/**
@@ -374,7 +375,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
implicitSingle = false;
loc = false;
- log = U.logger(cctx.kernalContext(), logRef, this);
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, this);
}
/** {@inheritDoc} */
@@ -430,6 +432,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> optimisticLockEntries() {
+ if (serializable() && optimistic())
+ return F.concat(false, writeEntries(), readEntries());
+
return writeEntries();
}
@@ -1267,88 +1272,81 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
if (F.isEmpty(txEntry.entryProcessors()))
return F.t(txEntry.op(), txEntry.value());
else {
- try {
- boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
+ boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
- CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() :
- txEntry.cached().innerGet(this,
- /*swap*/false,
- /*read through*/false,
- /*fail fast*/true,
- /*unmarshal*/true,
- /*metrics*/metrics,
- /*event*/recordEvt,
- /*temporary*/true,
- /*subjId*/subjId,
- /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
- resolveTaskName(),
- null);
+ CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() :
+ txEntry.cached().innerGet(this,
+ /*swap*/false,
+ /*read through*/false,
+ /*fail fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/metrics,
+ /*event*/recordEvt,
+ /*temporary*/true,
+ /*subjId*/subjId,
+ /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
+ resolveTaskName(),
+ null);
- boolean modified = false;
+ boolean modified = false;
- Object val = null;
+ Object val = null;
- Object key = null;
+ Object key = null;
- GridCacheVersion ver;
-
- try {
- ver = txEntry.cached().version();
- }
- catch (GridCacheEntryRemovedException e) {
- assert optimistic() : txEntry;
+ GridCacheVersion ver;
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+ try {
+ ver = txEntry.cached().version();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert optimistic() : txEntry;
- ver = null;
- }
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
- for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
- CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(),
- txEntry.key(), key, cacheVal, val, ver);
+ ver = null;
+ }
- try {
- EntryProcessor<Object, Object, Object> processor = t.get1();
+ for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
+ CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(),
+ txEntry.key(), key, cacheVal, val, ver);
- processor.process(invokeEntry, t.get2());
+ try {
+ EntryProcessor<Object, Object, Object> processor = t.get1();
- val = invokeEntry.getValue();
+ processor.process(invokeEntry, t.get2());
- key = invokeEntry.key();
- }
- catch (Exception ignore) {
- // No-op.
- }
+ val = invokeEntry.getValue();
- modified |= invokeEntry.modified();
+ key = invokeEntry.key();
+ }
+ catch (Exception ignore) {
+ // No-op.
}
- if (modified)
- cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
+ modified |= invokeEntry.modified();
+ }
- GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP;
+ if (modified)
+ cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
- if (op == NOOP) {
- ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
+ GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP;
- if (expiry != null) {
- long ttl = CU.toTtl(expiry.getExpiryForAccess());
+ if (op == NOOP) {
+ ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
- txEntry.ttl(ttl);
+ if (expiry != null) {
+ long ttl = CU.toTtl(expiry.getExpiryForAccess());
- if (ttl == CU.TTL_ZERO)
- op = DELETE;
- }
- }
+ txEntry.ttl(ttl);
- return F.t(op, cacheVal);
+ if (ttl == CU.TTL_ZERO)
+ op = DELETE;
+ }
}
- catch (GridCacheFilterFailedException e) {
- assert false : "Empty filter failed for innerGet: " + e;
- return null;
- }
+ return F.t(op, cacheVal);
}
}
@@ -1498,9 +1496,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
* @param e Entry to evict if it qualifies for eviction.
* @param primaryOnly Flag to try to evict only on primary node.
* @return {@code True} if attempt was made to evict the entry.
- * @throws IgniteCheckedException If failed.
*/
- protected boolean evictNearEntry(IgniteTxEntry e, boolean primaryOnly) throws IgniteCheckedException {
+ protected boolean evictNearEntry(IgniteTxEntry e, boolean primaryOnly) {
assert e != null;
if (isNearLocallyMapped(e, primaryOnly)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 2462dda..9eb2808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -66,6 +66,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** */
private static final long serialVersionUID = 0L;
+ /** Dummy version for non-existing entry read in SERIALIZABLE transaction. */
+ public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0);
+
+ /** Dummy version for any existing entry read in SERIALIZABLE transaction. */
+ public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1);
+
/** Owning transaction. */
@GridToStringExclude
@GridDirectTransient
@@ -175,6 +181,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
*/
private byte flags;
+ /** */
+ private GridCacheVersion serReadVer;
+
/**
* Required by {@link Externalizable}
*/
@@ -316,6 +325,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
cp.conflictVer = conflictVer;
cp.expiryPlc = expiryPlc;
cp.flags = flags;
+ cp.serReadVer = serReadVer;
return cp;
}
@@ -822,6 +832,23 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
this.entryProcessorCalcVal = entryProcessorCalcVal;
}
+ /**
+ * @return Read version for serializable transaction.
+ */
+ @Nullable public GridCacheVersion serializableReadVersion() {
+ return serReadVer;
+ }
+
+ /**
+ * @param serReadVer Read version for serializable transaction.
+ */
+ public void serializableReadVersion(GridCacheVersion serReadVer) {
+ assert this.serReadVer == null;
+ assert serReadVer != null;
+
+ this.serReadVer = serReadVer;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -884,18 +911,24 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
writer.incrementState();
case 8:
- if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
+ if (!writer.writeMessage("serReadVer", serReadVer))
return false;
writer.incrementState();
case 9:
- if (!writer.writeLong("ttl", ttl))
+ if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
return false;
writer.incrementState();
case 10:
+ if (!writer.writeLong("ttl", ttl))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
if (!writer.writeMessage("val", val))
return false;
@@ -979,7 +1012,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 8:
- transformClosBytes = reader.readByteArray("transformClosBytes");
+ serReadVer = reader.readMessage("serReadVer");
if (!reader.isLastRead())
return false;
@@ -987,7 +1020,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 9:
- ttl = reader.readLong("ttl");
+ transformClosBytes = reader.readByteArray("transformClosBytes");
if (!reader.isLastRead())
return false;
@@ -995,6 +1028,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
case 10:
+ ttl = reader.readLong("ttl");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
val = reader.readMessage("val");
if (!reader.isLastRead())
@@ -1014,7 +1055,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 11;
+ return 12;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 530fbdf..d9786a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
@@ -417,7 +418,8 @@ public class IgniteTxHandler {
if (tx.isRollbackOnly()) {
try {
- tx.rollback();
+ if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)
+ tx.rollback();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to rollback transaction: " + tx, e);