You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/06 11:39:48 UTC
incubator-ignite git commit: # ignite-157-2 renamings
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-157-2 b141abfd5 -> f5f95fb8c
# ignite-157-2 renamings
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f5f95fb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f5f95fb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f5f95fb8
Branch: refs/heads/ignite-157-2
Commit: f5f95fb8c952996f4479852b1ca2e086d3d57621
Parents: b141abf
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 09:56:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 09:56:30 2015 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 4 +-
.../communication/GridIoMessageFactory.java | 4 +-
...ridCacheOptimisticCheckPreparedTxFuture.java | 508 -------------------
...idCacheOptimisticCheckPreparedTxRequest.java | 261 ----------
...dCacheOptimisticCheckPreparedTxResponse.java | 179 -------
.../distributed/GridCacheTxRecoveryFuture.java | 506 ++++++++++++++++++
.../distributed/GridCacheTxRecoveryRequest.java | 261 ++++++++++
.../GridCacheTxRecoveryResponse.java | 182 +++++++
.../cache/transactions/IgniteTxHandler.java | 30 +-
.../cache/transactions/IgniteTxManager.java | 98 +---
.../resources/META-INF/classnames.properties | 6 +-
11 files changed, 976 insertions(+), 1063 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index e37b4f3..0540148 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -165,8 +165,8 @@ public class MessageCodeGenerator {
// gen.generateAndWrite(GridDhtTxFinishRequest.class);
// gen.generateAndWrite(GridDhtTxFinishResponse.class);
//
-// gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxRequest.class);
-// gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxResponse.class);
+// gen.generateAndWrite(GridCacheTxRecoveryRequest.class);
+// gen.generateAndWrite(GridCacheTxRecoveryResponse.class);
// gen.generateAndWrite(GridQueryCancelRequest.class);
// gen.generateAndWrite(GridQueryFailResponse.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a395747..7fe8da8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -166,12 +166,12 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 16:
- msg = new GridCacheOptimisticCheckPreparedTxRequest();
+ msg = new GridCacheTxRecoveryRequest();
break;
case 17:
- msg = new GridCacheOptimisticCheckPreparedTxResponse();
+ msg = new GridCacheTxRecoveryResponse();
break;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
deleted file mode 100644
index bd3e1cc..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Future verifying that all remote transactions related to some
- * optimistic transaction were prepared.
- */
-public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
- implements GridCacheFuture<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- private static IgniteLogger log;
-
- /** Trackable flag. */
- private boolean trackable = true;
-
- /** Context. */
- private final GridCacheSharedContext<K, V> cctx;
-
- /** Future ID. */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Transaction. */
- private final IgniteInternalTx tx;
-
- /** All involved nodes. */
- private final Map<UUID, ClusterNode> nodes;
-
- /** ID of failed node started transaction. */
- private final UUID failedNodeId;
-
- /** Transaction nodes mapping. */
- private final Map<UUID, Collection<UUID>> txNodes;
-
- /** */
- private final boolean nearTxCheck;
-
- /**
- * @param cctx Context.
- * @param tx Transaction.
- * @param failedNodeId ID of failed node started transaction.
- * @param txNodes Transaction mapping.
- */
- @SuppressWarnings("ConstantConditions")
- public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx,
- IgniteInternalTx tx,
- UUID failedNodeId,
- Map<UUID, Collection<UUID>> txNodes)
- {
- super(cctx.kernalContext(), CU.boolReducer());
-
- this.cctx = cctx;
- this.tx = tx;
- this.txNodes = txNodes;
- this.failedNodeId = failedNodeId;
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridCacheOptimisticCheckPreparedTxFuture.class);
-
- nodes = new GridLeanMap<>();
-
- UUID locNodeId = cctx.localNodeId();
-
- for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
- if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
- ClusterNode node = cctx.discovery().node(e.getKey());
-
- if (node != null)
- nodes.put(node.id(), node);
- else if (log.isDebugEnabled())
- log.debug("Transaction node left (will ignore) " + e.getKey());
- }
-
- for (UUID nodeId : e.getValue()) {
- if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
- ClusterNode node = cctx.discovery().node(nodeId);
-
- if (node != null)
- nodes.put(node.id(), node);
- else if (log.isDebugEnabled())
- log.debug("Transaction node left (will ignore) " + e.getKey());
- }
- }
- }
-
- UUID nearNodeId = tx.eventNodeId();
-
- nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
- }
-
- /**
- * Initializes future.
- */
- @SuppressWarnings("ConstantConditions")
- public void prepare() {
- if (nearTxCheck) {
- UUID nearNodeId = tx.eventNodeId();
-
- if (cctx.localNodeId().equals(nearNodeId)) {
- IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
-
- fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> fut) {
- try {
- onDone(fut.get());
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
- });
- }
- else {
- MiniFuture fut = new MiniFuture(tx.eventNodeId());
-
- add(fut);
-
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
- tx,
- 0,
- true,
- futureId(),
- fut.futureId());
-
- try {
- cctx.io().send(nearNodeId, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException e) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
-
- markInitialized();
- }
-
- return;
- }
-
- // First check transactions on local node.
- int locTxNum = nodeTransactions(cctx.localNodeId());
-
- if (locTxNum > 1) {
- IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
-
- if (fut == null || fut.isDone()) {
- boolean prepared;
-
- try {
- prepared = fut == null ? true : fut.get();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Check prepared transaction future failed: " + e, e);
-
- prepared = false;
- }
-
- if (!prepared) {
- onDone(false);
-
- markInitialized();
-
- return;
- }
- }
- else {
- fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> fut) {
- boolean prepared;
-
- try {
- prepared = fut.get();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Check prepared transaction future failed: " + e, e);
-
- prepared = false;
- }
-
- if (!prepared) {
- onDone(false);
-
- markInitialized();
- }
- else
- proceedPrepare();
- }
- });
-
- return;
- }
- }
-
- proceedPrepare();
- }
-
- /**
- * Process prepare after local check.
- */
- private void proceedPrepare() {
- for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
- UUID nodeId = entry.getKey();
-
- // Skip left nodes and local node.
- if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
- continue;
-
- /*
- * If primary node failed then send message to all backups, otherwise
- * send message only to primary node.
- */
-
- if (nodeId.equals(failedNodeId)) {
- for (UUID id : entry.getValue()) {
- // Skip backup node if it is local node or if it is also was mapped as primary.
- if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
- continue;
-
- MiniFuture fut = new MiniFuture(id);
-
- add(fut);
-
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
- nodeTransactions(id),
- false,
- futureId(),
- fut.futureId());
-
- try {
- cctx.io().send(id, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
-
- break;
- }
- }
- }
- else {
- MiniFuture fut = new MiniFuture(nodeId);
-
- add(fut);
-
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
- tx,
- nodeTransactions(nodeId),
- false,
- futureId(),
- fut.futureId());
-
- try {
- cctx.io().send(nodeId, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
-
- break;
- }
- }
- }
-
- markInitialized();
- }
-
- /**
- * @param nodeId Node ID.
- * @return Number of transactions on node.
- */
- private int nodeTransactions(UUID nodeId) {
- int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
-
- for (Collection<UUID> backups : txNodes.values()) {
- for (UUID backup : backups) {
- if (backup.equals(nodeId)) {
- cnt++; // +1 if node is backup.
-
- break;
- }
- }
- }
-
- return cnt;
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- public void onResult(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
- if (!isDone()) {
- for (IgniteInternalFuture<Boolean> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.futureId().equals(res.miniId())) {
- assert f.nodeId().equals(nodeId);
-
- f.onResult(res);
-
- break;
- }
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return nodes.values();
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- for (IgniteInternalFuture<?> fut : futures())
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.nodeId().equals(nodeId)) {
- f.onNodeLeft();
-
- return true;
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return trackable;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- trackable = false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
- if (super.onDone(res, err)) {
- cctx.mvcc().removeFuture(this);
-
- if (err == null) {
- assert res != null;
-
- cctx.tm().finishOptimisticTxOnRecovery(tx, res);
- }
- else {
- if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
- if (log.isDebugEnabled())
- log.debug("Failed to check transaction on near node, " +
- "ignoring [err=" + err + ", tx=" + tx + ']');
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Failed to check prepared transactions, " +
- "invalidating transaction [err=" + err + ", tx=" + tx + ']');
-
- cctx.tm().salvageTx(tx);
- }
- }
- }
-
- return false;
- }
-
- /**
- * @param f Future.
- * @return {@code True} if mini-future.
- */
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
- }
-
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxFuture.class, this, "super", super.toString());
- }
-
- /**
- *
- */
- private class MiniFuture extends GridFutureAdapter<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Mini future ID. */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Node ID. */
- private UUID nodeId;
-
- /**
- * @param nodeId Node ID.
- */
- private MiniFuture(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Node ID.
- */
- private UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @return Future ID.
- */
- private IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @param e Error.
- */
- private void onError(Throwable e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- onDone(e);
- }
-
- /**
- */
- private void onNodeLeft() {
- if (log.isDebugEnabled())
- log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
-
- if (nearTxCheck) {
- // Near and originating nodes left, need initiate tx check.
- cctx.tm().commitIfPrepared(tx);
-
- onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
- }
- else
- onDone(true);
- }
-
- /**
- * @param res Result callback.
- */
- private void onResult(GridCacheOptimisticCheckPreparedTxResponse res) {
- onDone(res.success());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
deleted file mode 100644
index 4f2a1d6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Message sent to check that transactions related to transaction were prepared on remote node.
- */
-public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Near transaction ID. */
- private GridCacheVersion nearXidVer;
-
- /** Expected number of transactions on node. */
- private int txNum;
-
- /** System transaction flag. */
- private boolean sys;
-
- /** {@code True} if should check only tx on near node. */
- private boolean nearTxCheck;
-
- /**
- * Empty constructor required by {@link Externalizable}
- */
- public GridCacheOptimisticCheckPreparedTxRequest() {
- // No-op.
- }
-
- /**
- * @param tx Transaction.
- * @param txNum Expected number of transactions on remote node.
- * @param nearTxCheck
- * @param futId Future ID.
- * @param miniId Mini future ID.
- */
- public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx,
- int txNum,
- boolean nearTxCheck,
- IgniteUuid futId,
- IgniteUuid miniId)
- {
- super(tx.xidVersion(), 0);
-
- nearXidVer = tx.nearXidVersion();
- sys = tx.system();
-
- this.futId = futId;
- this.miniId = miniId;
- this.txNum = txNum;
- this.nearTxCheck = nearTxCheck;
- }
-
- /**
- * @return {@code True} if should check only tx on near node.
- */
- public boolean nearTxCheck() {
- return nearTxCheck;
- }
-
- /**
- * @return Near version.
- */
- public GridCacheVersion nearXidVersion() {
- return nearXidVer;
- }
-
- /**
- * @return Future ID.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public IgniteUuid miniId() {
- return miniId;
- }
-
- /**
- * @return Expected number of transactions on node.
- */
- public int transactions() {
- return txNum;
- }
-
- /**
- * @return System transaction flag.
- */
- public boolean system() {
- return sys;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 8:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeBoolean("sys", sys))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeInt("txNum", txNum))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 8:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- nearTxCheck = reader.readBoolean("nearTxCheck");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- nearXidVer = reader.readMessage("nearXidVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- sys = reader.readBoolean("sys");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- txNum = reader.readInt("txNum");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 16;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 14;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxRequest.class, this, "super", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
deleted file mode 100644
index bc8c2e0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Check prepared transactions response.
- */
-public class GridCacheOptimisticCheckPreparedTxResponse extends GridDistributedBaseMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Flag indicating if all remote transactions were prepared. */
- private boolean success;
-
- /**
- * Empty constructor required by {@link Externalizable}
- */
- public GridCacheOptimisticCheckPreparedTxResponse() {
- // No-op.
- }
-
- /**
- * @param txId Transaction ID.
- * @param futId Future ID.
- * @param miniId Mini future ID.
- * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
- */
- public GridCacheOptimisticCheckPreparedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId,
- boolean success) {
- super(txId, 0);
-
- this.futId = futId;
- this.miniId = miniId;
- this.success = success;
- }
-
- /**
- * @return Future ID.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public IgniteUuid miniId() {
- return miniId;
- }
-
- /**
- * @return {@code True} if all remote transactions were prepared.
- */
- public boolean success() {
- return success;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 8:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeBoolean("success", success))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 8:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- success = reader.readBoolean("success");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 17;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 11;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxResponse.class, this, "super", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
new file mode 100644
index 0000000..663ed90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Future verifying that all remote transactions related to transaction were prepared or committed.
+ */
+public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Trackable flag. */
+ private boolean trackable = true;
+
+ /** Context. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /** Future ID. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Transaction. */
+ private final IgniteInternalTx tx;
+
+ /** All involved nodes. */
+ private final Map<UUID, ClusterNode> nodes;
+
+ /** ID of failed node started transaction. */
+ private final UUID failedNodeId;
+
+ /** Transaction nodes mapping. */
+ private final Map<UUID, Collection<UUID>> txNodes;
+
+ /** */
+ private final boolean nearTxCheck;
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ * @param failedNodeId ID of failed node started transaction.
+ * @param txNodes Transaction mapping.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
+ IgniteInternalTx tx,
+ UUID failedNodeId,
+ Map<UUID, Collection<UUID>> txNodes)
+ {
+ super(cctx.kernalContext(), CU.boolReducer());
+
+ this.cctx = cctx;
+ this.tx = tx;
+ this.txNodes = txNodes;
+ this.failedNodeId = failedNodeId;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class);
+
+ nodes = new GridLeanMap<>();
+
+ UUID locNodeId = cctx.localNodeId();
+
+ for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
+ if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
+ ClusterNode node = cctx.discovery().node(e.getKey());
+
+ if (node != null)
+ nodes.put(node.id(), node);
+ else if (log.isDebugEnabled())
+ log.debug("Transaction node left (will ignore) " + e.getKey());
+ }
+
+ for (UUID nodeId : e.getValue()) {
+ if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ if (node != null)
+ nodes.put(node.id(), node);
+ else if (log.isDebugEnabled())
+ log.debug("Transaction node left (will ignore) " + e.getKey());
+ }
+ }
+ }
+
+ UUID nearNodeId = tx.eventNodeId();
+
+ nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
+ }
+
+ /**
+ * Initializes future.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void prepare() {
+ if (nearTxCheck) {
+ UUID nearNodeId = tx.eventNodeId();
+
+ if (cctx.localNodeId().equals(nearNodeId)) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+ tx,
+ 0,
+ true,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nearNodeId, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+
+ markInitialized();
+ }
+
+ return;
+ }
+
+ // First check transactions on local node.
+ int locTxNum = nodeTransactions(cctx.localNodeId());
+
+ if (locTxNum > 1) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
+
+ if (fut == null || fut.isDone()) {
+ boolean prepared;
+
+ try {
+ prepared = fut == null ? true : fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed: " + e, e);
+
+ prepared = false;
+ }
+
+ if (!prepared) {
+ onDone(false);
+
+ markInitialized();
+
+ return;
+ }
+ }
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ boolean prepared;
+
+ try {
+ prepared = fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed: " + e, e);
+
+ prepared = false;
+ }
+
+ if (!prepared) {
+ onDone(false);
+
+ markInitialized();
+ }
+ else
+ proceedPrepare();
+ }
+ });
+
+ return;
+ }
+ }
+
+ proceedPrepare();
+ }
+
+ /**
+ * Process prepare after local check.
+ */
+ private void proceedPrepare() {
+ for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
+ UUID nodeId = entry.getKey();
+
+ // Skip left nodes and local node.
+ if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
+ continue;
+
+ /*
+ * If primary node failed then send message to all backups, otherwise
+ * send message only to primary node.
+ */
+
+ if (nodeId.equals(failedNodeId)) {
+ for (UUID id : entry.getValue()) {
+ // Skip backup node if it is local node or if it is also was mapped as primary.
+ if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
+ continue;
+
+ MiniFuture fut = new MiniFuture(id);
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(tx,
+ nodeTransactions(id),
+ false,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(id, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+
+ break;
+ }
+ }
+ }
+ else {
+ MiniFuture fut = new MiniFuture(nodeId);
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+ tx,
+ nodeTransactions(nodeId),
+ false,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nodeId, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+
+ break;
+ }
+ }
+ }
+
+ markInitialized();
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return Number of transactions on node.
+ */
+ private int nodeTransactions(UUID nodeId) {
+ int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
+
+ for (Collection<UUID> backups : txNodes.values()) {
+ for (UUID backup : backups) {
+ if (backup.equals(nodeId)) {
+ cnt++; // +1 if node is backup.
+
+ break;
+ }
+ }
+ }
+
+ return cnt;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
+ if (!isDone()) {
+ for (IgniteInternalFuture<Boolean> fut : pending()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.nodeId().equals(nodeId);
+
+ f.onResult(res);
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return tx.xidVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return nodes.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ for (IgniteInternalFuture<?> fut : futures())
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.nodeId().equals(nodeId)) {
+ f.onNodeLeft();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ trackable = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ cctx.mvcc().removeFuture(this);
+
+ if (err == null) {
+ assert res != null;
+
+ cctx.tm().finishTxOnRecovery(tx, res);
+ }
+ else {
+ if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check transaction on near node, " +
+ "ignoring [err=" + err + ", tx=" + tx + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check prepared transactions, " +
+ "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+
+ cctx.tm().salvageTx(tx);
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param f Future.
+ * @return {@code True} if mini-future.
+ */
+ private boolean isMini(IgniteInternalFuture<?> f) {
+ return f.getClass().equals(MiniFuture.class);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryFuture.class, this, "super", super.toString());
+ }
+
+ /**
+ *
+ */
+ private class MiniFuture extends GridFutureAdapter<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mini future ID. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Node ID. */
+ private UUID nodeId;
+
+ /**
+ * @param nodeId Node ID.
+ */
+ private MiniFuture(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ private UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ private IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @param e Error.
+ */
+ private void onError(Throwable e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+ onDone(e);
+ }
+
+ /**
+ */
+ private void onNodeLeft() {
+ if (log.isDebugEnabled())
+ log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
+
+ if (nearTxCheck) {
+ // Near and originating nodes left, need initiate tx check.
+ cctx.tm().commitIfPrepared(tx);
+
+ onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+ }
+ else
+ onDone(true);
+ }
+
+ /**
+ * @param res Result callback.
+ */
+ private void onResult(GridCacheTxRecoveryResponse res) {
+ onDone(res.success());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
new file mode 100644
index 0000000..259c288
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Message sent to check that transactions related to transaction were prepared on remote node.
+ */
+public class GridCacheTxRecoveryRequest extends GridDistributedBaseMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Near transaction ID. */
+ private GridCacheVersion nearXidVer;
+
+ /** Expected number of transactions on node. */
+ private int txNum;
+
+ /** System transaction flag. */
+ private boolean sys;
+
+ /** {@code True} if should check only tx on near node. */
+ private boolean nearTxCheck;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheTxRecoveryRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param tx Transaction.
+ * @param txNum Expected number of transactions on remote node.
+ * @param nearTxCheck {@code True} if should check only tx on near node.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ */
+ public GridCacheTxRecoveryRequest(IgniteInternalTx tx,
+ int txNum,
+ boolean nearTxCheck,
+ IgniteUuid futId,
+ IgniteUuid miniId)
+ {
+ super(tx.xidVersion(), 0);
+
+ nearXidVer = tx.nearXidVersion();
+ sys = tx.system();
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.txNum = txNum;
+ this.nearTxCheck = nearTxCheck;
+ }
+
+ /**
+ * @return {@code True} if should check only tx on near node.
+ */
+ public boolean nearTxCheck() {
+ return nearTxCheck;
+ }
+
+ /**
+ * @return Near version.
+ */
+ public GridCacheVersion nearXidVersion() {
+ return nearXidVer;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Expected number of transactions on node.
+ */
+ public int transactions() {
+ return txNum;
+ }
+
+ /**
+ * @return System transaction flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 8:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeBoolean("sys", sys))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeInt("txNum", txNum))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 8:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ nearTxCheck = reader.readBoolean("nearTxCheck");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ nearXidVer = reader.readMessage("nearXidVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ sys = reader.readBoolean("sys");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ txNum = reader.readInt("txNum");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 16;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 14;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryRequest.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
new file mode 100644
index 0000000..e5c026a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Transactions recovery check response.
+ */
+public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Flag indicating if all remote transactions were prepared. */
+ private boolean success;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheTxRecoveryResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
+ */
+ public GridCacheTxRecoveryResponse(GridCacheVersion txId,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ boolean success)
+ {
+ super(txId, 0);
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.success = success;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return {@code True} if all remote transactions were prepared.
+ */
+ public boolean success() {
+ return success;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 8:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("success", success))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 8:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ success = reader.readBoolean("success");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 17;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 11;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryResponse.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 2897e30..af75fb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -119,16 +119,16 @@ public class IgniteTxHandler {
}
});
- ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class,
- new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest>() {
- @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest req) {
+ ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class,
+ new CI2<UUID, GridCacheTxRecoveryRequest>() {
+ @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) {
processCheckPreparedTxRequest(nodeId, req);
}
});
- ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class,
- new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse>() {
- @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
+ ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class,
+ new CI2<UUID, GridCacheTxRecoveryResponse>() {
+ @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) {
processCheckPreparedTxResponse(nodeId, res);
}
});
@@ -138,6 +138,7 @@ public class IgniteTxHandler {
* @param nearNodeId Near node ID that initiated transaction.
* @param locTx Optional local transaction.
* @param req Near prepare request.
+ * @param completeCb Completion callback.
* @return Future for transaction.
*/
public IgniteInternalFuture<IgniteInternalTx> prepareTx(
@@ -170,6 +171,7 @@ public class IgniteTxHandler {
*
* @param locTx Local transaction.
* @param req Near prepare request.
+ * @param completeCb Completion callback.
* @return Prepare future.
*/
private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx(
@@ -177,7 +179,6 @@ public class IgniteTxHandler {
final GridNearTxPrepareRequest req,
final IgniteInClosure<GridNearTxPrepareResponse> completeCb
) {
-
IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
return new GridEmbeddedFuture<>(
@@ -223,6 +224,7 @@ public class IgniteTxHandler {
*
* @param nearNodeId Near node ID that initiated transaction.
* @param req Near prepare request.
+ * @param completeCb Completion callback.
* @return Prepare future.
*/
private IgniteInternalFuture<IgniteInternalTx> prepareNearTx(
@@ -442,6 +444,7 @@ public class IgniteTxHandler {
/**
* @param nodeId Node ID.
+ * @param locTx Local transaction.
* @param req Request.
* @return Future.
*/
@@ -1099,6 +1102,7 @@ public class IgniteTxHandler {
}
/**
+ * @param cacheCtx Context.
* @param key Key
* @param ver Version.
* @throws IgniteCheckedException If invalidate failed.
@@ -1183,7 +1187,7 @@ public class IgniteTxHandler {
* @param req Request.
*/
protected void processCheckPreparedTxRequest(final UUID nodeId,
- final GridCacheOptimisticCheckPreparedTxRequest req)
+ final GridCacheTxRecoveryRequest req)
{
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
@@ -1231,10 +1235,10 @@ public class IgniteTxHandler {
* @param prepared {@code True} if all transaction prepared or committed.
*/
private void sendCheckPreparedResponse(UUID nodeId,
- GridCacheOptimisticCheckPreparedTxRequest req,
+ GridCacheTxRecoveryRequest req,
boolean prepared) {
- GridCacheOptimisticCheckPreparedTxResponse res =
- new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared);
+ GridCacheTxRecoveryResponse res =
+ new GridCacheTxRecoveryResponse(req.version(), req.futureId(), req.miniId(), prepared);
try {
if (log.isDebugEnabled())
@@ -1256,11 +1260,11 @@ public class IgniteTxHandler {
* @param nodeId Node ID.
* @param res Response.
*/
- protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
+ protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) {
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
- GridCacheOptimisticCheckPreparedTxFuture fut = (GridCacheOptimisticCheckPreparedTxFuture)ctx.mvcc().
+ GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().
<Boolean>future(res.version(), res.futureId());
if (fut == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 85b3ad0..8a1d490 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1931,40 +1931,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Gets local transaction for pessimistic tx recovery.
- *
- * @param nearXidVer Near tx ID.
- * @return Near local or colocated local transaction.
- */
- @Nullable public IgniteInternalTx localTxForRecovery(GridCacheVersion nearXidVer, boolean markFinalizing) {
- // First check if we have near transaction with this ID.
- IgniteInternalTx tx = idMap.get(nearXidVer);
-
- if (tx == null) {
- // Check all local transactions and mark them as waiting for recovery to prevent finish race.
- for (IgniteInternalTx txEx : idMap.values()) {
- if (nearXidVer.equals(txEx.nearXidVersion())) {
- if (!markFinalizing || !txEx.markFinalizing(RECOVERY_WAIT))
- tx = txEx;
- }
- }
- }
-
- // Either we found near transaction or one of transactions is being committed by user.
- // Wait for it and send reply.
- if (tx != null && tx.local())
- return tx;
-
- return null;
- }
-
- /**
* Commits or rolls back prepared transaction.
*
* @param tx Transaction.
* @param commit Whether transaction should be committed or rolled back.
*/
- public void finishOptimisticTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
+ public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
if (log.isDebugEnabled())
log.debug("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']');
@@ -1989,71 +1961,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Commits or rolls back pessimistic transaction.
- *
- * @param tx Transaction to finish.
- * @param commitInfo Commit information.
- */
- public void finishPessimisticTxOnRecovery(final IgniteInternalTx tx, GridCacheCommittedTxInfo commitInfo) {
- if (!tx.markFinalizing(RECOVERY_FINISH)) {
- if (log.isDebugEnabled())
- log.debug("Will not try to finish pessimistic transaction (could not mark as finalizing): " + tx);
-
- return;
- }
-
- if (tx instanceof GridDistributedTxRemoteAdapter) {
- IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
-
- rmtTx.doneRemote(tx.xidVersion(),
- Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList(),
- Collections.<GridCacheVersion>emptyList());
- }
-
- try {
- tx.prepare();
-
- if (commitInfo != null) {
- for (IgniteTxEntry entry : commitInfo.recoveryWrites()) {
- IgniteTxEntry write = tx.writeMap().get(entry.txKey());
-
- if (write != null) {
- GridCacheEntryEx cached = write.cached();
-
- IgniteTxEntry recovered = entry.cleanCopy(write.context());
-
- if (cached == null || cached.detached())
- cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion());
-
- recovered.cached(cached);
-
- tx.writeMap().put(entry.txKey(), recovered);
-
- continue;
- }
-
- // If write was not found, check read.
- IgniteTxEntry read = tx.readMap().remove(entry.txKey());
-
- if (read != null)
- tx.writeMap().put(entry.txKey(), entry);
- }
-
- tx.commitAsync().listen(new CommitListener(tx));
- }
- else
- tx.rollbackAsync();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to prepare pessimistic transaction (will invalidate): " + tx, e);
-
- salvageTx(tx);
- }
- }
-
- /**
- * Commits optimistic transaction in case when node started transaction failed, but all related
+ * Commits transaction in case when node started transaction failed, but all related
* transactions were prepared (invalidates transaction if it is not fully prepared).
*
* @param tx Transaction.
@@ -2063,7 +1971,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert !F.isEmpty(tx.transactionNodes()) : tx;
assert tx.nearXidVersion() != null : tx;
- GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
+ GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
cctx,
tx,
tx.originatingNodeId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 35495ed..657f4af 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -455,9 +455,9 @@ org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresMa
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$RemoveSetDataCallable
org.apache.ignite.internal.processors.cache.distributed.GridCacheCommittedTxInfo
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxFuture$1
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxRequest
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxResponse
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture$1
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest
org.apache.ignite.internal.processors.cache.distributed.GridDistributedBaseMessage
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter