You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 08:33:08 UTC
[19/24] incubator-ignite git commit: ignite-545: merge from
ignite-sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
deleted file mode 100644
index 8a14b48..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Future verifying that all remote transactions related to some
- * optimistic transaction were prepared.
- */
-public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
- implements GridCacheFuture<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- private static IgniteLogger log;
-
- /** Trackable flag. */
- private boolean trackable = true;
-
- /** Context. */
- private final GridCacheSharedContext<K, V> cctx;
-
- /** Future ID. */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Transaction. */
- private final IgniteInternalTx tx;
-
- /** All involved nodes. */
- private final Map<UUID, ClusterNode> nodes;
-
- /** ID of failed node started transaction. */
- private final UUID failedNodeId;
-
- /** Transaction nodes mapping. */
- private final Map<UUID, Collection<UUID>> txNodes;
-
- /**
- * @param cctx Context.
- * @param tx Transaction.
- * @param failedNodeId ID of failed node started transaction.
- * @param txNodes Transaction mapping.
- */
- @SuppressWarnings("ConstantConditions")
- public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx, IgniteInternalTx tx,
- UUID failedNodeId, Map<UUID, Collection<UUID>> txNodes) {
- super(cctx.kernalContext(), CU.boolReducer());
-
- this.cctx = cctx;
- this.tx = tx;
- this.txNodes = txNodes;
- this.failedNodeId = failedNodeId;
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridCacheOptimisticCheckPreparedTxFuture.class);
-
- nodes = new GridLeanMap<>();
-
- UUID locNodeId = cctx.localNodeId();
-
- for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
- if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
- ClusterNode node = cctx.discovery().node(e.getKey());
-
- if (node != null)
- nodes.put(node.id(), node);
- else if (log.isDebugEnabled())
- log.debug("Transaction node left (will ignore) " + e.getKey());
- }
-
- for (UUID nodeId : e.getValue()) {
- if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
- ClusterNode node = cctx.discovery().node(nodeId);
-
- if (node != null)
- nodes.put(node.id(), node);
- else if (log.isDebugEnabled())
- log.debug("Transaction node left (will ignore) " + e.getKey());
- }
- }
- }
- }
-
- /**
- * Initializes future.
- */
- @SuppressWarnings("ConstantConditions")
- public void prepare() {
- // First check transactions on local node.
- int locTxNum = nodeTransactions(cctx.localNodeId());
-
- if (locTxNum > 1) {
- IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
-
- if (fut == null || fut.isDone()) {
- boolean prepared;
-
- try {
- prepared = fut == null ? true : fut.get();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Check prepared transaction future failed: " + e, e);
-
- prepared = false;
- }
-
- if (!prepared) {
- onDone(false);
-
- markInitialized();
-
- return;
- }
- }
- else {
- fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> fut) {
- boolean prepared;
-
- try {
- prepared = fut.get();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Check prepared transaction future failed: " + e, e);
-
- prepared = false;
- }
-
- if (!prepared) {
- onDone(false);
-
- markInitialized();
- }
- else
- proceedPrepare();
- }
- });
-
- return;
- }
- }
-
- proceedPrepare();
- }
-
- /**
- * Process prepare after local check.
- */
- private void proceedPrepare() {
- for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
- UUID nodeId = entry.getKey();
-
- // Skip left nodes and local node.
- if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
- continue;
-
- /*
- * If primary node failed then send message to all backups, otherwise
- * send message only to primary node.
- */
-
- if (nodeId.equals(failedNodeId)) {
- for (UUID id : entry.getValue()) {
- // Skip backup node if it is local node or if it is also was mapped as primary.
- if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
- continue;
-
- MiniFuture fut = new MiniFuture(id);
-
- add(fut);
-
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
- nodeTransactions(id),
- futureId(),
- fut.futureId());
-
- try {
- cctx.io().send(id, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
-
- break;
- }
- }
- }
- else {
- MiniFuture fut = new MiniFuture(nodeId);
-
- add(fut);
-
- GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
- tx, nodeTransactions(nodeId), futureId(), fut.futureId());
-
- try {
- cctx.io().send(nodeId, req, tx.ioPolicy());
- }
- catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
-
- break;
- }
- }
- }
-
- markInitialized();
- }
-
- /**
- * @param nodeId Node ID.
- * @return Number of transactions on node.
- */
- private int nodeTransactions(UUID nodeId) {
- int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
-
- for (Collection<UUID> backups : txNodes.values()) {
- for (UUID backup : backups) {
- if (backup.equals(nodeId)) {
- cnt++; // +1 if node is backup.
-
- break;
- }
- }
- }
-
- return cnt;
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- public void onResult(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
- if (!isDone()) {
- for (IgniteInternalFuture<Boolean> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.futureId().equals(res.miniId())) {
- assert f.nodeId().equals(nodeId);
-
- f.onResult(res);
-
- break;
- }
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return nodes.values();
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- for (IgniteInternalFuture<?> fut : futures())
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.nodeId().equals(nodeId)) {
- f.onNodeLeft();
-
- return true;
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return trackable;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- trackable = false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
- if (super.onDone(res, err)) {
- cctx.mvcc().removeFuture(this);
-
- if (err == null) {
- assert res != null;
-
- cctx.tm().finishOptimisticTxOnRecovery(tx, res);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Failed to check prepared transactions, " +
- "invalidating transaction [err=" + err + ", tx=" + tx + ']');
-
- cctx.tm().salvageTx(tx);
- }
- }
-
- return false;
- }
-
- /**
- * @param f Future.
- * @return {@code True} if mini-future.
- */
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
- }
-
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxFuture.class, this, "super", super.toString());
- }
-
- /**
- *
- */
- private class MiniFuture extends GridFutureAdapter<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Mini future ID. */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Node ID. */
- private UUID nodeId;
-
- /**
- * @param nodeId Node ID.
- */
- private MiniFuture(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Node ID.
- */
- private UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @return Future ID.
- */
- private IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @param e Error.
- */
- private void onError(Throwable e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- onDone(e);
- }
-
- /**
- */
- private void onNodeLeft() {
- if (log.isDebugEnabled())
- log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
-
- onDone(true);
- }
-
- /**
- * @param res Result callback.
- */
- private void onResult(GridCacheOptimisticCheckPreparedTxResponse res) {
- onDone(res.success());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
deleted file mode 100644
index e83db66..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Message sent to check that transactions related to some optimistic transaction
- * were prepared on remote node.
- */
-public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Near transaction ID. */
- private GridCacheVersion nearXidVer;
-
- /** Expected number of transactions on node. */
- private int txNum;
-
- /** System transaction flag. */
- private boolean sys;
-
- /**
- * Empty constructor required by {@link Externalizable}
- */
- public GridCacheOptimisticCheckPreparedTxRequest() {
- // No-op.
- }
-
- /**
- * @param tx Transaction.
- * @param txNum Expected number of transactions on remote node.
- * @param futId Future ID.
- * @param miniId Mini future ID.
- */
- public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, int txNum, IgniteUuid futId,
- IgniteUuid miniId) {
- super(tx.xidVersion(), 0);
-
- nearXidVer = tx.nearXidVersion();
- sys = tx.system();
-
- this.futId = futId;
- this.miniId = miniId;
- this.txNum = txNum;
- }
-
- /**
- * @return Near version.
- */
- public GridCacheVersion nearXidVersion() {
- return nearXidVer;
- }
-
- /**
- * @return Future ID.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public IgniteUuid miniId() {
- return miniId;
- }
-
- /**
- * @return Expected number of transactions on node.
- */
- public int transactions() {
- return txNum;
- }
-
- /**
- * @return System transaction flag.
- */
- public boolean system() {
- return sys;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 8:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeBoolean("sys", sys))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeInt("txNum", txNum))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 8:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- nearXidVer = reader.readMessage("nearXidVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- sys = reader.readBoolean("sys");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- txNum = reader.readInt("txNum");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 16;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 13;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxRequest.class, this, "super", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
deleted file mode 100644
index bc8c2e0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Check prepared transactions response.
- */
-public class GridCacheOptimisticCheckPreparedTxResponse extends GridDistributedBaseMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Mini future ID. */
- private IgniteUuid miniId;
-
- /** Flag indicating if all remote transactions were prepared. */
- private boolean success;
-
- /**
- * Empty constructor required by {@link Externalizable}
- */
- public GridCacheOptimisticCheckPreparedTxResponse() {
- // No-op.
- }
-
- /**
- * @param txId Transaction ID.
- * @param futId Future ID.
- * @param miniId Mini future ID.
- * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
- */
- public GridCacheOptimisticCheckPreparedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId,
- boolean success) {
- super(txId, 0);
-
- this.futId = futId;
- this.miniId = miniId;
- this.success = success;
- }
-
- /**
- * @return Future ID.
- */
- public IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Mini future ID.
- */
- public IgniteUuid miniId() {
- return miniId;
- }
-
- /**
- * @return {@code True} if all remote transactions were prepared.
- */
- public boolean success() {
- return success;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 8:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeIgniteUuid("miniId", miniId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeBoolean("success", success))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 8:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- miniId = reader.readIgniteUuid("miniId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- success = reader.readBoolean("success");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 17;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 11;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheOptimisticCheckPreparedTxResponse.class, this, "super", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
new file mode 100644
index 0000000..663ed90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Future verifying that all remote transactions related to transaction were prepared or committed.
+ */
+public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Trackable flag. */
+ private boolean trackable = true;
+
+ /** Context. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /** Future ID. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Transaction. */
+ private final IgniteInternalTx tx;
+
+ /** All involved nodes. */
+ private final Map<UUID, ClusterNode> nodes;
+
+ /** ID of failed node started transaction. */
+ private final UUID failedNodeId;
+
+ /** Transaction nodes mapping. */
+ private final Map<UUID, Collection<UUID>> txNodes;
+
+ /** */
+ private final boolean nearTxCheck;
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ * @param failedNodeId ID of failed node started transaction.
+ * @param txNodes Transaction mapping.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
+ IgniteInternalTx tx,
+ UUID failedNodeId,
+ Map<UUID, Collection<UUID>> txNodes)
+ {
+ super(cctx.kernalContext(), CU.boolReducer());
+
+ this.cctx = cctx;
+ this.tx = tx;
+ this.txNodes = txNodes;
+ this.failedNodeId = failedNodeId;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class);
+
+ nodes = new GridLeanMap<>();
+
+ UUID locNodeId = cctx.localNodeId();
+
+ for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
+ if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
+ ClusterNode node = cctx.discovery().node(e.getKey());
+
+ if (node != null)
+ nodes.put(node.id(), node);
+ else if (log.isDebugEnabled())
+ log.debug("Transaction node left (will ignore) " + e.getKey());
+ }
+
+ for (UUID nodeId : e.getValue()) {
+ if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ if (node != null)
+ nodes.put(node.id(), node);
+ else if (log.isDebugEnabled())
+ log.debug("Transaction node left (will ignore) " + e.getKey());
+ }
+ }
+ }
+
+ UUID nearNodeId = tx.eventNodeId();
+
+ nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
+ }
+
+ /**
+ * Initializes future.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void prepare() {
+ if (nearTxCheck) {
+ UUID nearNodeId = tx.eventNodeId();
+
+ if (cctx.localNodeId().equals(nearNodeId)) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+ tx,
+ 0,
+ true,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nearNodeId, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+
+ markInitialized();
+ }
+
+ return;
+ }
+
+ // First check transactions on local node.
+ int locTxNum = nodeTransactions(cctx.localNodeId());
+
+ if (locTxNum > 1) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
+
+ if (fut == null || fut.isDone()) {
+ boolean prepared;
+
+ try {
+ prepared = fut == null ? true : fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed: " + e, e);
+
+ prepared = false;
+ }
+
+ if (!prepared) {
+ onDone(false);
+
+ markInitialized();
+
+ return;
+ }
+ }
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ boolean prepared;
+
+ try {
+ prepared = fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Check prepared transaction future failed: " + e, e);
+
+ prepared = false;
+ }
+
+ if (!prepared) {
+ onDone(false);
+
+ markInitialized();
+ }
+ else
+ proceedPrepare();
+ }
+ });
+
+ return;
+ }
+ }
+
+ proceedPrepare();
+ }
+
+ /**
+ * Process prepare after local check.
+ */
+ private void proceedPrepare() {
+ for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
+ UUID nodeId = entry.getKey();
+
+ // Skip left nodes and local node.
+ if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
+ continue;
+
+ /*
+ * If primary node failed then send message to all backups, otherwise
+ * send message only to primary node.
+ */
+
+ if (nodeId.equals(failedNodeId)) {
+ for (UUID id : entry.getValue()) {
+ // Skip backup node if it is local node or if it is also was mapped as primary.
+ if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
+ continue;
+
+ MiniFuture fut = new MiniFuture(id);
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(tx,
+ nodeTransactions(id),
+ false,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(id, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+
+ break;
+ }
+ }
+ }
+ else {
+ MiniFuture fut = new MiniFuture(nodeId);
+
+ add(fut);
+
+ GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+ tx,
+ nodeTransactions(nodeId),
+ false,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nodeId, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ fut.onNodeLeft();
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+
+ break;
+ }
+ }
+ }
+
+ markInitialized();
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return Number of transactions on node.
+ */
+ private int nodeTransactions(UUID nodeId) {
+ int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
+
+ for (Collection<UUID> backups : txNodes.values()) {
+ for (UUID backup : backups) {
+ if (backup.equals(nodeId)) {
+ cnt++; // +1 if node is backup.
+
+ break;
+ }
+ }
+ }
+
+ return cnt;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
+ if (!isDone()) {
+ for (IgniteInternalFuture<Boolean> fut : pending()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.nodeId().equals(nodeId);
+
+ f.onResult(res);
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return tx.xidVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return nodes.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ for (IgniteInternalFuture<?> fut : futures())
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.nodeId().equals(nodeId)) {
+ f.onNodeLeft();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ trackable = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ cctx.mvcc().removeFuture(this);
+
+ if (err == null) {
+ assert res != null;
+
+ cctx.tm().finishTxOnRecovery(tx, res);
+ }
+ else {
+ if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check transaction on near node, " +
+ "ignoring [err=" + err + ", tx=" + tx + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check prepared transactions, " +
+ "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+
+ cctx.tm().salvageTx(tx);
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param f Future.
+ * @return {@code True} if mini-future.
+ */
+ private boolean isMini(IgniteInternalFuture<?> f) {
+ return f.getClass().equals(MiniFuture.class);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryFuture.class, this, "super", super.toString());
+ }
+
+ /**
+ *
+ */
+ private class MiniFuture extends GridFutureAdapter<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mini future ID. */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Node ID. */
+ private UUID nodeId;
+
+ /**
+ * @param nodeId Node ID.
+ */
+ private MiniFuture(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ private UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ private IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @param e Error.
+ */
+ private void onError(Throwable e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+ onDone(e);
+ }
+
+ /**
+ */
+ private void onNodeLeft() {
+ if (log.isDebugEnabled())
+ log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
+
+ if (nearTxCheck) {
+ // Near and originating nodes left, need initiate tx check.
+ cctx.tm().commitIfPrepared(tx);
+
+ onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+ }
+ else
+ onDone(true);
+ }
+
+ /**
+ * @param res Result callback.
+ */
+ private void onResult(GridCacheTxRecoveryResponse res) {
+ onDone(res.success());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
new file mode 100644
index 0000000..259c288
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Message sent to check that transactions related to transaction were prepared on remote node.
+ */
+public class GridCacheTxRecoveryRequest extends GridDistributedBaseMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Near transaction ID. */
+ private GridCacheVersion nearXidVer;
+
+ /** Expected number of transactions on node. */
+ private int txNum;
+
+ /** System transaction flag. */
+ private boolean sys;
+
+ /** {@code True} if should check only tx on near node. */
+ private boolean nearTxCheck;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheTxRecoveryRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param tx Transaction.
+ * @param txNum Expected number of transactions on remote node.
+ * @param nearTxCheck {@code True} if should check only tx on near node.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ */
+ public GridCacheTxRecoveryRequest(IgniteInternalTx tx,
+ int txNum,
+ boolean nearTxCheck,
+ IgniteUuid futId,
+ IgniteUuid miniId)
+ {
+ super(tx.xidVersion(), 0);
+
+ nearXidVer = tx.nearXidVersion();
+ sys = tx.system();
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.txNum = txNum;
+ this.nearTxCheck = nearTxCheck;
+ }
+
+ /**
+ * @return {@code True} if should check only tx on near node.
+ */
+ public boolean nearTxCheck() {
+ return nearTxCheck;
+ }
+
+ /**
+ * @return Near version.
+ */
+ public GridCacheVersion nearXidVersion() {
+ return nearXidVer;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Expected number of transactions on node.
+ */
+ public int transactions() {
+ return txNum;
+ }
+
+ /**
+ * @return System transaction flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 8:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeBoolean("sys", sys))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeInt("txNum", txNum))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 8:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ nearTxCheck = reader.readBoolean("nearTxCheck");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ nearXidVer = reader.readMessage("nearXidVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ sys = reader.readBoolean("sys");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ txNum = reader.readInt("txNum");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 16;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 14;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryRequest.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
new file mode 100644
index 0000000..e5c026a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Transactions recovery check response.
+ */
+public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Flag indicating if all remote transactions were prepared. */
+ private boolean success;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheTxRecoveryResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
+ */
+ public GridCacheTxRecoveryResponse(GridCacheVersion txId,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ boolean success)
+ {
+ super(txId, 0);
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.success = success;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return {@code True} if all remote transactions were prepared.
+ */
+ public boolean success() {
+ return success;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 8:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("success", success))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 8:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ success = reader.readBoolean("success");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 17;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 11;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTxRecoveryResponse.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 3a685cc..c5ef22f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.affinity.*;
@@ -30,17 +31,17 @@ import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.datastreamer.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
-import java.util.concurrent.*;
-import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
/**
* Distributed cache implementation.
@@ -142,21 +143,28 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
try {
AffinityTopologyVersion topVer;
+ boolean retry;
+
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ boolean skipStore = opCtx != null && opCtx.skipStore();
+
do {
+ retry = false;
+
topVer = ctx.affinity().affinityTopologyVersion();
// Send job to all data nodes.
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- ctx.closures().callAsyncNoFailover(BROADCAST,
- new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes,
- true).get();
+ retry = !ctx.kernalContext().task().execute(
+ new RemoveAllTask(ctx.name(), topVer, skipStore), null).get();
}
}
- while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0);
+ while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -170,7 +178,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
- removeAllAsync(opFut, topVer);
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ boolean skipStore = opCtx != null && opCtx.skipStore();
+
+ removeAllAsync(opFut, topVer, skipStore);
return opFut;
}
@@ -178,27 +190,29 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/**
* @param opFut Future.
* @param topVer Topology version.
+ * @param skipStore Skip store flag.
*/
- private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) {
+ private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer,
+ final boolean skipStore) {
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST,
- new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, true);
+ IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(
+ new RemoveAllTask(ctx.name(), topVer, skipStore), null);
- rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
+ rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
try {
- fut.get();
+ boolean retry = !fut.get();
AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion();
- if (topVer0.equals(topVer))
+ if (topVer0.equals(topVer) && !retry)
opFut.onDone();
else
- removeAllAsync(opFut, topVer0);
+ removeAllAsync(opFut, topVer0, skipStore);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -227,97 +241,150 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
}
/**
- * Internal callable which performs remove all primary key mappings
- * operation on a cache with the given name.
+ * Remove task.
*/
@GridInternal
- private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
+ private static class RemoveAllTask extends ComputeTaskAdapter<Object, Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** Cache name. */
- private String cacheName;
+ private final String cacheName;
- /** Topology version. */
- private AffinityTopologyVersion topVer;
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
/** Skip store flag. */
- private boolean skipStore;
-
- /** Injected grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
+ private final boolean skipStore;
/**
- * Empty constructor for serialization.
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
+ * @param skipStore Skip store flag.
*/
- public GlobalRemoveAllCallable() {
- // No-op.
+ public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ this.skipStore = skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) throws IgniteException {
+ Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ IgniteException e = res.getException();
+
+ if (e != null) {
+ if (e instanceof ClusterTopologyException)
+ return ComputeJobResultPolicy.WAIT;
+
+ throw new IgniteException("Remote job threw exception.", e);
+ }
+
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+ for (ComputeJobResult locRes : results) {
+ if (locRes != null && (locRes.getException() != null || !locRes.<Boolean>getData()))
+ return false;
+ }
+
+ return true;
}
+ }
+ /**
+ * Internal job which performs remove all primary key mappings
+ * operation on a cache with the given name.
+ */
+ @GridInternal
+ private static class GlobalRemoveAllJob<K,V> extends TopologyVersionAwareJob {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Skip store flag. */
+ private final boolean skipStore;
/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @param skipStore Skip store flag.
*/
- private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
- this.cacheName = cacheName;
- this.topVer = topVer;
+ private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
+ super(cacheName, topVer);
+
this.skipStore = skipStore;
}
- /**
- * {@inheritDoc}
- */
- @Override public Object call() throws Exception {
- GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+ /** {@inheritDoc} */
+ @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache0) {
+ GridCacheAdapter cache = ((IgniteKernal) ignite).context().cache().internalCache(cacheName);
- final GridCacheContext<K, V> ctx = cacheAdapter.context();
+ if (cache == null)
+ return true;
- ctx.affinity().affinityReadyFuture(topVer).get();
+ final GridCacheContext<K, V> ctx = cache.context();
ctx.gate().enter();
try {
if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
- return null; // Ignore this remove request because remove request will be sent again.
+ return false; // Ignore this remove request because remove request will be sent again.
GridDhtCacheAdapter<K, V> dht;
GridNearCacheAdapter<K, V> near = null;
- if (cacheAdapter instanceof GridNearCacheAdapter) {
- near = ((GridNearCacheAdapter<K, V>)cacheAdapter);
+ if (cache instanceof GridNearCacheAdapter) {
+ near = ((GridNearCacheAdapter<K, V>) cache);
dht = near.dht();
}
else
- dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
+ dht = (GridDhtCacheAdapter<K, V>) cache;
try (DataStreamerImpl<KeyCacheObject, Object> dataLdr =
- (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
- ((DataStreamerImpl)dataLdr).maxRemapCount(0);
+ (DataStreamerImpl) ignite.dataStreamer(cacheName)) {
+ ((DataStreamerImpl) dataLdr).maxRemapCount(0);
dataLdr.skipStore(skipStore);
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
- for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
- if (!locPart.isEmpty() && locPart.primary(topVer)) {
- for (GridDhtCacheEntry o : locPart.entries()) {
- if (!o.obsoleteOrDeleted())
- dataLdr.removeDataInternal(o.key());
- }
- }
- }
+ for (int part : ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) {
+ GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false);
- Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer);
+ if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve())
+ return false;
- while (it.hasNext())
- dataLdr.removeDataInternal(it.next());
+ try {
+ if (!locPart.isEmpty()) {
+ for (GridDhtCacheEntry o : locPart.entries()) {
+ if (!o.obsoleteOrDeleted())
+ dataLdr.removeDataInternal(o.key());
+ }
+ }
- it = dht.context().swap().swapKeyIterator(true, false, topVer);
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ dht.context().swap().iterator(part);
- while (it.hasNext())
- dataLdr.removeDataInternal(it.next());
+ if (iter != null) {
+ for (Map.Entry<byte[], GridCacheSwapEntry> e : iter)
+ dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
+ }
+ }
+ finally {
+ locPart.release();
+ }
+ }
}
if (near != null) {
@@ -329,25 +396,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
}
}
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
ctx.gate().leave();
}
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
- out.writeObject(topVer);
- out.writeBoolean(skipStore);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
- topVer = (AffinityTopologyVersion)in.readObject();
- skipStore = in.readBoolean();
+ return true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index fd1040f..c5ac847 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -83,12 +82,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
/** Key count. */
private int txSize;
- /** Group lock key if this is a group-lock transaction. */
- private IgniteTxKey grpLockKey;
-
- /** Partition lock flag. Only if group-lock transaction. */
- private boolean partLock;
-
/**
* Additional flags.
* GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value.
@@ -116,9 +109,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
* @param timeout Lock timeout.
* @param keyCnt Number of keys.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
- * @param partLock {@code True} if this is a group-lock transaction request and whole partition is
- * locked.
* @param skipStore Skip store flag.
*/
public GridDistributedLockRequest(
@@ -135,8 +125,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
long timeout,
int keyCnt,
int txSize,
- @Nullable IgniteTxKey grpLockKey,
- boolean partLock,
boolean skipStore
) {
super(lockVer, keyCnt);
@@ -156,8 +144,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
this.isInvalidate = isInvalidate;
this.timeout = timeout;
this.txSize = txSize;
- this.grpLockKey = grpLockKey;
- this.partLock = partLock;
retVals = new boolean[keyCnt];
@@ -295,27 +281,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
}
/**
- * @return {@code True} if lock request for group-lock transaction.
- */
- public boolean groupLock() {
- return grpLockKey != null;
- }
-
- /**
- * @return Group lock key.
- */
- @Nullable public IgniteTxKey groupLockKey() {
- return grpLockKey;
- }
-
- /**
- * @return {@code True} if partition is locked in group-lock transaction.
- */
- public boolean partitionLock() {
- return partLock;
- }
-
- /**
* @return Max lock wait time.
*/
public long timeout() {
@@ -330,9 +295,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
GridCacheContext cctx = ctx.cacheContext(cacheId);
prepareMarshalCacheObjects(keys, cctx);
-
- if (grpLockKey != null)
- grpLockKey.prepareMarshal(cctx);
}
/** {@inheritDoc} */
@@ -342,9 +304,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
GridCacheContext cctx = ctx.cacheContext(cacheId);
finishUnmarshalCacheObjects(keys, cctx, ldr);
-
- if (grpLockKey != null)
- grpLockKey.finishUnmarshal(cctx, ldr);
}
/** {@inheritDoc} */
@@ -375,78 +334,66 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
writer.incrementState();
case 10:
- if (!writer.writeMessage("grpLockKey", grpLockKey))
- return false;
-
- writer.incrementState();
-
- case 11:
if (!writer.writeBoolean("isInTx", isInTx))
return false;
writer.incrementState();
- case 12:
+ case 11:
if (!writer.writeBoolean("isInvalidate", isInvalidate))
return false;
writer.incrementState();
- case 13:
+ case 12:
if (!writer.writeBoolean("isRead", isRead))
return false;
writer.incrementState();
- case 14:
+ case 13:
if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
- case 15:
+ case 14:
if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 16:
+ case 15:
if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
- case 17:
+ case 16:
if (!writer.writeUuid("nodeId", nodeId))
return false;
writer.incrementState();
- case 18:
- if (!writer.writeBoolean("partLock", partLock))
- return false;
-
- writer.incrementState();
-
- case 19:
+ case 17:
if (!writer.writeBooleanArray("retVals", retVals))
return false;
writer.incrementState();
- case 20:
+ case 18:
if (!writer.writeLong("threadId", threadId))
return false;
writer.incrementState();
- case 21:
+ case 19:
if (!writer.writeLong("timeout", timeout))
return false;
writer.incrementState();
- case 22:
+ case 20:
if (!writer.writeInt("txSize", txSize))
return false;
@@ -485,14 +432,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
case 10:
- grpLockKey = reader.readMessage("grpLockKey");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
isInTx = reader.readBoolean("isInTx");
if (!reader.isLastRead())
@@ -500,7 +439,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 12:
+ case 11:
isInvalidate = reader.readBoolean("isInvalidate");
if (!reader.isLastRead())
@@ -508,7 +447,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 13:
+ case 12:
isRead = reader.readBoolean("isRead");
if (!reader.isLastRead())
@@ -516,7 +455,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 14:
+ case 13:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -528,7 +467,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 15:
+ case 14:
keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -536,7 +475,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 16:
+ case 15:
nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
@@ -544,7 +483,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 17:
+ case 16:
nodeId = reader.readUuid("nodeId");
if (!reader.isLastRead())
@@ -552,15 +491,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 18:
- partLock = reader.readBoolean("partLock");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
+ case 17:
retVals = reader.readBooleanArray("retVals");
if (!reader.isLastRead())
@@ -568,7 +499,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 20:
+ case 18:
threadId = reader.readLong("threadId");
if (!reader.isLastRead())
@@ -576,7 +507,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 21:
+ case 19:
timeout = reader.readLong("timeout");
if (!reader.isLastRead())
@@ -584,7 +515,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
reader.incrementState();
- case 22:
+ case 20:
txSize = reader.readInt("txSize");
if (!reader.isLastRead())
@@ -604,7 +535,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 23;
+ return 21;
}
/** {@inheritDoc} */