You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/03 03:03:22 UTC
[13/50] [abbrv] ignite git commit: Merge branch master into ignite-264
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --cc modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index de9d112,0540148..3fa39cc
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@@ -141,12 -142,8 +142,12 @@@ public class MessageCodeGenerator
MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
- gen.generateAndWrite(DataStreamerEntry.class);
+ // gen.generateAndWrite(DataStreamerEntry.class);
+// gen.generateAndWrite(GridDistributedUnlockRequest.class);
+// gen.generateAndWrite(GridNearUnlockRequest.class);
+// gen.generateAndWrite(GridDhtUnlockRequest.class);
+//
// gen.generateAndWrite(GridDistributedLockRequest.class);
// gen.generateAndWrite(GridDistributedLockResponse.class);
// gen.generateAndWrite(GridNearLockRequest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 0000000,663ed90..2978c77
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@@ -1,0 -1,506 +1,530 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.processors.cache.*;
++import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.jetbrains.annotations.*;
+
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+
+ /**
+ * Future verifying that all remote transactions related to transaction were prepared or committed.
+ */
+ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Trackable flag. */
+ private boolean trackable = true;
+
+ /** Context. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /** Future ID. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Transaction. */
+ private final IgniteInternalTx tx;
+
+ /** All involved nodes. */
+ private final Map<UUID, ClusterNode> nodes;
+
+ /** ID of failed node started transaction. */
+ private final UUID failedNodeId;
+
+ /** Transaction nodes mapping. */
+ private final Map<UUID, Collection<UUID>> txNodes;
+
+ /** */
+ private final boolean nearTxCheck;
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ * @param failedNodeId ID of failed node started transaction.
+ * @param txNodes Transaction mapping.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
+ IgniteInternalTx tx,
+ UUID failedNodeId,
+ Map<UUID, Collection<UUID>> txNodes)
+ {
+ super(cctx.kernalContext(), CU.boolReducer());
+
+ this.cctx = cctx;
+ this.tx = tx;
+ this.txNodes = txNodes;
+ this.failedNodeId = failedNodeId;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class);
+
+ nodes = new GridLeanMap<>();
+
+ UUID locNodeId = cctx.localNodeId();
+
+ for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
+ if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
+ ClusterNode node = cctx.discovery().node(e.getKey());
+
+ if (node != null)
+ nodes.put(node.id(), node);
+ else if (log.isDebugEnabled())
+ log.debug("Transaction node left (will ignore) " + e.getKey());
+ }
+
+ for (UUID nodeId : e.getValue()) {
+ if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ if (node != null)
+ nodes.put(node.id(), node);
+ else if (log.isDebugEnabled())
+ log.debug("Transaction node left (will ignore) " + e.getKey());
+ }
+ }
+ }
+
+ UUID nearNodeId = tx.eventNodeId();
+
+ nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
+ }
+
+ /**
+ * Initializes future.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void prepare() {
+ if (nearTxCheck) {
+ UUID nearNodeId = tx.eventNodeId();
+
+ if (cctx.localNodeId().equals(nearNodeId)) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+ tx,
+ 0,
+ true,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nearNodeId, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+
+ markInitialized();
+ }
+
+ return;
+ }
+
+ // First check transactions on local node.
+ int locTxNum = nodeTransactions(cctx.localNodeId());
+
+ if (locTxNum > 1) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
+
+ if (fut == null || fut.isDone()) {
+ boolean prepared;
+
+ try {
+ prepared = fut == null ? true : fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed: " + e, e);
+
+ prepared = false;
+ }
+
+ if (!prepared) {
+ onDone(false);
+
+ markInitialized();
+
+ return;
+ }
+ }
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ boolean prepared;
+
+ try {
+ prepared = fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed: " + e, e);
+
+ prepared = false;
+ }
+
+ if (!prepared) {
+ onDone(false);
+
+ markInitialized();
+ }
+ else
+ proceedPrepare();
+ }
+ });
+
+ return;
+ }
+ }
+
+ proceedPrepare();
+ }
+
+ /**
+ * Process prepare after local check.
+ */
+ private void proceedPrepare() {
+ for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
+ UUID nodeId = entry.getKey();
+
+ // Skip left nodes and local node.
+ if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
+ continue;
+
+ /*
+ * If primary node failed then send message to all backups, otherwise
+ * send message only to primary node.
+ */
+
+ if (nodeId.equals(failedNodeId)) {
+ for (UUID id : entry.getValue()) {
+ // Skip backup node if it is local node or if it is also was mapped as primary.
+ if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
+ continue;
+
+ MiniFuture fut = new MiniFuture(id);
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(tx,
+ nodeTransactions(id),
+ false,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(id, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+
+ break;
+ }
+ }
+ }
+ else {
+ MiniFuture fut = new MiniFuture(nodeId);
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+ tx,
+ nodeTransactions(nodeId),
+ false,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nodeId, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+
+ break;
+ }
+ }
+ }
+
++ // Specifically check originating near node.
++ if (tx instanceof GridDhtTxRemote) {
++ UUID nearNodeId = ((GridDhtTxRemote)tx).nearNodeId();
++
++ if (cctx.localNodeId().equals(nearNodeId))
++ add(cctx.tm().nearTxCommitted(tx.nearXidVersion()));
++ else {
++ MiniFuture fut = new MiniFuture(nearNodeId);
++
++ add(fut);
++
++ GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new GridCacheOptimisticCheckPreparedTxRequest<>(
++ tx, 1, futureId(), fut.futureId(), true);
++
++ try {
++ cctx.io().send(nearNodeId, req, tx.ioPolicy());
++ }
++ catch (ClusterTopologyCheckedException ignored) {
++ fut.onNodeLeft();
++ }
++ catch (IgniteCheckedException e) {
++ fut.onError(e);
++ }
++ }
++ }
++
+ markInitialized();
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return Number of transactions on node.
+ */
+ private int nodeTransactions(UUID nodeId) {
+ int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
+
+ for (Collection<UUID> backups : txNodes.values()) {
+ for (UUID backup : backups) {
+ if (backup.equals(nodeId)) {
+ cnt++; // +1 if node is backup.
+
+ break;
+ }
+ }
+ }
+
+ return cnt;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
+ if (!isDone()) {
+ for (IgniteInternalFuture<Boolean> fut : pending()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.nodeId().equals(nodeId);
+
+ f.onResult(res);
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return tx.xidVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return nodes.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ for (IgniteInternalFuture<?> fut : futures())
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
- if (f.nodeId().equals(nodeId)) {
++ if (f.nodeId().equals(nodeId))
+ f.onNodeLeft();
-
- return true;
- }
+ }
+
- return false;
++ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ trackable = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ cctx.mvcc().removeFuture(this);
+
+ if (err == null) {
+ assert res != null;
+
+ cctx.tm().finishTxOnRecovery(tx, res);
+ }
+ else {
+ if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check transaction on near node, " +
+ "ignoring [err=" + err + ", tx=" + tx + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check prepared transactions, " +
+ "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+
+ cctx.tm().salvageTx(tx);
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param f Future.
+ * @return {@code True} if mini-future.
+ */
+ private boolean isMini(IgniteInternalFuture<?> f) {
+ return f.getClass().equals(MiniFuture.class);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryFuture.class, this, "super", super.toString());
+ }
+
+ /**
+ *
+ */
+ private class MiniFuture extends GridFutureAdapter<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mini future ID. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Node ID. */
+ private UUID nodeId;
+
+ /**
+ * @param nodeId Node ID.
+ */
+ private MiniFuture(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ private UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ private IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @param e Error.
+ */
+ private void onError(Throwable e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+ onDone(e);
+ }
+
+ /**
+ */
+ private void onNodeLeft() {
+ if (log.isDebugEnabled())
+ log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
+
+ if (nearTxCheck) {
+ // Near and originating nodes left, need initiate tx check.
+ cctx.tm().commitIfPrepared(tx);
+
+ onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+ }
+ else
+ onDone(true);
+ }
+
+ /**
+ * @param res Result callback.
+ */
+ private void onResult(GridCacheTxRecoveryResponse res) {
+ onDone(res.success());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
index 0000000,2f49ef4..15d2141
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
@@@ -1,0 -1,261 +1,268 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.extensions.communication.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Message sent to check that transactions related to transaction were prepared on remote node.
+ */
+ public class GridCacheTxRecoveryRequest extends GridDistributedBaseMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Near transaction ID. */
+ private GridCacheVersion nearXidVer;
+
+ /** Expected number of transactions on node. */
+ private int txNum;
+
+ /** System transaction flag. */
+ private boolean sys;
+
+ /** {@code True} if should check only tx on near node. */
+ private boolean nearTxCheck;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheTxRecoveryRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param tx Transaction.
+ * @param txNum Expected number of transactions on remote node.
+ * @param nearTxCheck {@code True} if should check only tx on near node.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ */
+ public GridCacheTxRecoveryRequest(IgniteInternalTx tx,
+ int txNum,
+ boolean nearTxCheck,
+ IgniteUuid futId,
+ IgniteUuid miniId)
+ {
+ super(tx.xidVersion(), 0);
+
+ nearXidVer = tx.nearXidVersion();
+ sys = tx.system();
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.txNum = txNum;
+ this.nearTxCheck = nearTxCheck;
+ }
+
+ /**
+ * @return {@code True} if should check only tx on near node.
+ */
+ public boolean nearTxCheck() {
+ return nearTxCheck;
+ }
+
+ /**
+ * @return Near version.
+ */
+ public GridCacheVersion nearXidVersion() {
+ return nearXidVer;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Expected number of transactions on node.
+ */
+ public int transactions() {
+ return txNum;
+ }
+
+ /**
+ * @return System transaction flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
++ /**
++ * @return Near check flag.
++ */
++ public boolean nearCheck() {
++ return nearCheck;
++ }
++
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 7:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeBoolean("sys", sys))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeInt("txNum", txNum))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 7:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ nearTxCheck = reader.readBoolean("nearTxCheck");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ nearXidVer = reader.readMessage("nearXidVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ sys = reader.readBoolean("sys");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ txNum = reader.readInt("txNum");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 16;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
- return 13;
++ return 12;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryRequest.class, this, "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index 63c28e9,7b84f32..f759fcc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@@ -55,15 -54,16 +54,6 @@@ public abstract class GridDistributedBa
@GridToStringExclude
private byte[] candsByIdxBytes;
- /** Collections of local lock candidates. */
- /** Committed versions with order higher than one for this message (needed for commit ordering). */
-- @GridToStringInclude
- @GridDirectTransient
- private Map<KeyCacheObject, Collection<GridCacheMvccCandidate>> candsByKey;
- @GridDirectCollection(GridCacheVersion.class)
- private Collection<GridCacheVersion> committedVers;
--
- /** Collections of local lock candidates in serialized form. */
- @GridToStringExclude
- private byte[] candsByKeyBytes;
- /** Rolled back versions with order higher than one for this message (needed for commit ordering). */
- @GridToStringInclude
- @GridDirectCollection(GridCacheVersion.class)
- private Collection<GridCacheVersion> rolledbackVers;
--
/** Count of keys referenced in candidates array (needed only locally for optimization). */
@GridToStringInclude
@GridDirectTransient
@@@ -228,13 -207,19 +173,7 @@@
writer.incrementState();
-- case 4:
- if (!writer.writeByteArray("candsByKeyBytes", candsByKeyBytes))
- if (!writer.writeCollection("committedVers", committedVers, MessageCollectionItemType.MSG))
-- return false;
--
-- writer.incrementState();
--
-- case 5:
- if (!writer.writeCollection("rolledbackVers", rolledbackVers, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
+ case 6:
if (!writer.writeMessage("ver", ver))
return false;
@@@ -264,15 -249,23 +203,7 @@@
reader.incrementState();
-- case 4:
- candsByKeyBytes = reader.readByteArray("candsByKeyBytes");
- committedVers = reader.readCollection("committedVers", MessageCollectionItemType.MSG);
--
-- if (!reader.isLastRead())
-- return false;
--
-- reader.incrementState();
--
-- case 5:
- rolledbackVers = reader.readCollection("rolledbackVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
+ case 6:
ver = reader.readMessage("ver");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 18ae318,7a84f9a..5d79f0a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@@ -94,9 -98,12 +91,9 @@@ public class GridDistributedTxFinishReq
boolean commit,
boolean invalidate,
boolean sys,
- GridIoPolicy plc,
+ byte plc,
boolean syncCommit,
boolean syncRollback,
- GridCacheVersion baseVer,
- Collection<GridCacheVersion> committedVers,
- Collection<GridCacheVersion> rolledbackVers,
int txSize
) {
super(xidVer, 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 1af96e9,e160529..5080e3a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@@ -747,9 -756,16 +754,9 @@@ public final class GridDhtLockFuture ex
try {
while (true) {
try {
- hasRmtNodes = cctx.dhtMap(
- nearNodeId,
- topVer,
- entry,
- tx == null ? lockVer : null,
- log,
- dhtMap,
- null);
+ cctx.dhtMap(nearNodeId, topVer, entry, log, dhtMap, null);
- GridCacheMvccCandidate cand = entry.mappings(lockVer);
+ GridCacheMvccCandidate cand = entry.candidate(lockVer);
// Possible in case of lock cancellation.
if (cand == null) {
@@@ -850,22 -870,35 +858,33 @@@
boolean invalidateRdr = e.readerId(n.id()) != null;
- req.addDhtKey(
- e.key(),
- invalidateRdr,
- cctx);
+ req.addDhtKey(e.key(), invalidateRdr, cctx);
- if (needVal)
+ if (needVal) {
// Mark last added key as needed to be preloaded.
req.markLastKeyForPreload();
+
+ if (tx != null) {
+ IgniteTxEntry txEntry = tx.entry(e.txKey());
+
+ // NOOP entries will be sent to backups on prepare step.
+ if (txEntry.op() == GridCacheOperation.READ)
+ txEntry.op(GridCacheOperation.NOOP);
+ }
+ }
-
- it.set(addOwned(req, e));
}
- add(fut); // Append new future.
+ if (!F.isEmpty(req.keys())) {
+ if (tx != null)
+ tx.addLockTransactionNode(n);
- if (log.isDebugEnabled())
- log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']');
+ add(fut); // Append new future.
- cctx.io().send(n, req, cctx.ioPolicy());
+ if (log.isDebugEnabled())
+ log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']');
+
+ cctx.io().send(n, req, cctx.ioPolicy());
+ }
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index fa679ef,4f081bf..da14b9f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@@ -946,9 -1064,22 +1050,15 @@@ public abstract class GridDhtTransactio
try {
// Send reply back to originating near node.
GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(),
- req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ tx != null && tx.onePhaseCommit(),
+ entries.size(),
+ err,
+ null);
if (err == null) {
- res.pending(localDhtPendingVersions(entries, mappedVer));
-
- // We have to add completed versions for cases when nearLocal and remote transactions
- // execute concurrently.
- res.completedVersions(ctx.tm().committedVersions(req.version()),
- ctx.tm().rolledbackVersions(req.version()));
-
int i = 0;
for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext();) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index dc3eeac,9aa9c17..87fe6e7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@@ -313,8 -379,12 +379,8 @@@ public final class GridDhtTxFinishFutur
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- tx.syncCommit(),
- tx.syncRollback(),
+ sync,
+ sync,
- tx.completedBase(),
- tx.committedVersions(),
- tx.rolledbackVersions(),
- tx.pendingVersions(),
tx.size(),
tx.subjectId(),
tx.taskNameHash());
@@@ -362,8 -430,12 +428,8 @@@
tx.system(),
tx.ioPolicy(),
tx.isSystemInvalidate(),
- tx.syncCommit(),
- tx.syncRollback(),
+ sync,
+ sync,
- tx.completedBase(),
- tx.committedVersions(),
- tx.rolledbackVersions(),
- tx.pendingVersions(),
tx.size(),
tx.subjectId(),
tx.taskNameHash());
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 1a6883b,fe72b24..52fb87a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@@ -19,11 -19,8 +19,10 @@@ package org.apache.ignite.internal.proc
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.affinity.*;
- import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@@ -336,7 -335,15 +335,7 @@@ public class GridDhtTxFinishRequest ext
reader.incrementState();
- case 24:
- case 21:
- pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
+ case 22:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 1d8f2b8,b50a010..2b0e7c2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@@ -64,6 -62,18 +62,12 @@@ public abstract class GridDhtTxLocalAda
/** */
private long dhtThreadId;
+ /** */
+ protected boolean explicitLock;
+
- /** */
- private boolean needsCompletedVers;
-
- /** Versions of pending locks for entries of this tx. */
- private Collection<GridCacheVersion> pendingVers;
-
+ /** Nodes where transactions were started on lock step. */
+ private Set<ClusterNode> lockTxNodes;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1b097b6,fbc8c84..1cc3175
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -292,8 -291,12 +291,12 @@@ public final class GridDhtTxPrepareFutu
boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
- if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
+ if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
- cached.unswap(true, retVal);
+ cached.unswap(retVal);
+
+ boolean readThrough = (retVal || hasFilters) &&
+ cacheCtx.config().isLoadPreviousValue() &&
+ !txEntry.skipStore();
CacheObject val = cached.innerGet(
tx,
@@@ -937,15 -909,15 +947,17 @@@
for (IgniteTxEntry entry : nearMapping.writes()) {
try {
- GridCacheMvccCandidate added = entry.cached().candidate(version());
+ if (entry.explicitVersion() == null) {
+ GridCacheMvccCandidate added = entry.cached().candidate(version());
- assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
- "[added=" + added + ", entry=" + entry + ']';
+ assert added != null : "Null candidate for non-group-lock entry " +
+ "[added=" + added + ", entry=" + entry + ']';
+ assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+ "[added=" + added + ", entry=" + entry + ']';
- if (added != null && added.ownerVersion() != null)
- req.owned(entry.txKey(), added.ownerVersion());
+ if (added != null && added.ownerVersion() != null)
+ req.owned(entry.txKey(), added.ownerVersion());
+ }
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 49c1fb6,221b230..8202d10
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -581,8 -581,20 +578,18 @@@ public class GridDhtColocatedCache<K, V
GridDistributedUnlockRequest req = mapping.getValue();
if (!F.isEmpty(req.keys())) {
- // We don't wait for reply to this message.
- ctx.io().send(n, req, ctx.ioPolicy());
- req.completedVersions(committed, rolledback);
-
+ try {
+ // We don't wait for reply to this message.
+ ctx.io().send(n, req, ctx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send unlock request (node has left the grid) [keys=" + req.keys() +
+ ", n=" + n + ", e=" + e + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send unlock request [keys=" + req.keys() + ", n=" + n + ']', e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index ce614cb,3d28018..d264de1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -1015,9 -1074,10 +1074,9 @@@ public final class GridNearLockFuture e
// Lock is held at this point, so we can set the
// returned value if any.
- entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer.get());
- entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
- res.rolledbackVersions(), res.pending());
+ entry.readyNearLock(lockVer, mappedVer);
if (inTx() && implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
@@@ -1317,111 -1381,132 +1380,128 @@@
return;
}
- int i = 0;
+ if (res.clientRemapVersion() != null) {
+ assert cctx.kernalContext().clientNode();
- AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+ IgniteInternalFuture<?> affFut =
+ cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
- for (KeyCacheObject k : keys) {
- while (true) {
- GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+ if (affFut != null && !affFut.isDone()) {
+ affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ remap();
+ }
+ });
+ }
+ else
+ remap();
+ }
+ else {
+ int i = 0;
- try {
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
+ AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
- return;
- }
+ for (KeyCacheObject k : keys) {
+ while (true) {
+ GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
- IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+ try {
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
- CacheObject oldVal = entry.rawGet();
- boolean hasOldVal = false;
- CacheObject newVal = res.value(i);
+ return;
+ }
- boolean readRecordable = false;
+ IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
- if (retval) {
- readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+ CacheObject oldVal = entry.rawGet();
+ boolean hasOldVal = false;
+ CacheObject newVal = res.value(i);
- if (readRecordable)
- hasOldVal = entry.hasValue();
- }
+ boolean readRecordable = false;
+
+ if (retval) {
+ readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+
+ if (readRecordable)
+ hasOldVal = entry.hasValue();
+ }
- GridCacheVersion dhtVer = res.dhtVersion(i);
- GridCacheVersion mappedVer = res.mappedVersion(i);
+ GridCacheVersion dhtVer = res.dhtVersion(i);
+ GridCacheVersion mappedVer = res.mappedVersion(i);
- if (newVal == null) {
- if (oldValTup != null) {
- if (oldValTup.get1().equals(dhtVer))
- newVal = oldValTup.get2();
+ if (newVal == null) {
+ if (oldValTup != null) {
+ if (oldValTup.get1().equals(dhtVer))
+ newVal = oldValTup.get2();
- oldVal = oldValTup.get2();
+ oldVal = oldValTup.get2();
+ }
}
- }
- // Lock is held at this point, so we can set the
- // returned value if any.
- entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id());
+ // Lock is held at this point, so we can set the
+ // returned value if any.
+ entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
- if (inTx() && implicitTx() && tx.onePhaseCommit()) {
- boolean pass = res.filterResult(i);
+ if (inTx()) {
+ tx.hasRemoteLocks(true);
- tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
- }
+ if (implicitTx() && tx.onePhaseCommit()) {
+ boolean pass = res.filterResult(i);
- entry.readyNearLock(lockVer, mappedVer);
-
- if (retval) {
- if (readRecordable)
- cctx.events().addEvent(
- entry.partition(),
- entry.key(),
- tx,
- null,
- EVT_CACHE_OBJECT_READ,
- newVal,
- newVal != null,
- oldVal,
- hasOldVal,
- CU.subjectId(tx, cctx.shared()),
- null,
- inTx() ? tx.resolveTaskName() : null);
-
- if (cctx.cache().configuration().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(false);
- }
+ tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
+ }
+ }
-
- entry.readyNearLock(lockVer,
- mappedVer,
- res.committedVersions(),
- res.rolledbackVersions(),
- res.pending());
++
++ entry.readyNearLock(lockVer, mappedVer);
+
+ if (retval) {
+ if (readRecordable)
+ cctx.events().addEvent(
+ entry.partition(),
+ entry.key(),
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ newVal,
+ newVal != null,
+ oldVal,
+ hasOldVal,
+ CU.subjectId(tx, cctx.shared()),
+ null,
+ inTx() ? tx.resolveTaskName() : null);
+
+ if (cctx.cache().configuration().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(false);
+ }
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
- break; // Inner while loop.
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to add candidates because entry was removed (will renew).");
+ break; // Inner while loop.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to add candidates because entry was removed (will renew).");
- // Replace old entry with new one.
- entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ // Replace old entry with new one.
+ entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+ }
}
- catch (IgniteCheckedException e) {
- onDone(e);
- return;
- }
+ i++;
}
- i++;
- }
+ try {
+ proceedMapping(mappings);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
- try {
- proceedMapping(mappings);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ onDone(true);
}
-
- onDone(true);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 1e68d5e,4776963..081c7fc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@@ -347,7 -381,13 +364,13 @@@ public class GridNearLockRequest extend
writer.incrementState();
- case 23:
+ case 21:
+ if (!writer.writeBoolean("firstClientReq", firstClientReq))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
if (!writer.writeBoolean("hasTransforms", hasTransforms))
return false;
@@@ -371,7 -411,13 +394,7 @@@
writer.incrementState();
- case 30:
- case 28:
- if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
- return false;
-
- writer.incrementState();
-
+ case 29:
if (!writer.writeBoolean("retVal", retVal))
return false;
@@@ -473,7 -527,23 +504,15 @@@
reader.incrementState();
- case 25:
- case 28:
- onePhaseCommit = reader.readBoolean("onePhaseCommit");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
+ case 29:
+ retVal = reader.readBoolean("retVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 30:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index 1fefa4c,62002dc..3b4f7e5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@@ -18,6 -18,8 +18,7 @@@
package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.version.*;
@@@ -90,6 -104,31 +97,13 @@@ public class GridNearLockResponse exten
}
/**
+ * @return {@code True} if client node should remap lock request.
+ */
+ @Nullable public AffinityTopologyVersion clientRemapVersion() {
+ return clientRemapVer;
+ }
+
+ /**
- * Gets pending versions that are less than {@link #version()}.
- *
- * @return Pending versions.
- */
- public Collection<GridCacheVersion> pending() {
- return pending;
- }
-
- /**
- * Sets pending versions that are less than {@link #version()}.
- *
- * @param pending Pending versions.
- */
- public void pending(Collection<GridCacheVersion> pending) {
- this.pending = pending;
- }
-
- /**
* @return Mini future ID.
*/
public IgniteUuid miniId() {