You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/12 11:24:22 UTC
[08/10] ignite git commit: ignite-5932
ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d6670e8c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d6670e8c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d6670e8c
Branch: refs/heads/ignite-5932
Commit: d6670e8c80b225e29c04150f10a26c51de66b0f6
Parents: af88754
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 13:43:49 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 13:43:49 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 22 +-
.../near/GridNearTxFinishAndAckFuture.java | 16 +-
.../cache/distributed/near/GridNearTxLocal.java | 4 +
.../cache/mvcc/CacheCoordinatorsProcessor.java | 95 ++++++---
.../cache/mvcc/CoordinatorAckRequestQuery.java | 130 ++++++++++++
.../cache/mvcc/CoordinatorAckRequestTx.java | 201 +++++++++++++++++++
.../mvcc/CoordinatorAckRequestTxAndQuery.java | 120 +++++++++++
.../mvcc/CoordinatorAckRequestTxAndQueryEx.java | 144 +++++++++++++
.../cache/mvcc/CoordinatorQueryAckRequest.java | 130 ------------
.../cache/mvcc/CoordinatorTxAckRequest.java | 194 ------------------
.../processors/cache/mvcc/MvccCounter.java | 2 +-
.../processors/cache/mvcc/MvccQueryTracker.java | 37 ++++
.../cache/mvcc/PreviousCoordinatorQueries.java | 16 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 8 +-
14 files changed, 747 insertions(+), 372 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 99bc8af..6a59c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -103,10 +103,12 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQueryEx;
import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestQuery;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTx;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQuery;
import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
@@ -892,7 +894,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 131: // TODO IGNITE-3478 fix constants.
- msg = new CoordinatorTxAckRequest();
+ msg = new CoordinatorAckRequestTx();
break;
@@ -907,7 +909,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 134:
- msg = new CoordinatorQueryAckRequest();
+ msg = new CoordinatorAckRequestQuery();
break;
@@ -937,6 +939,16 @@ public class GridIoMessageFactory implements MessageFactory {
return msg;
case 141:
+ msg = new CoordinatorAckRequestTxAndQuery();
+
+ return msg;
+
+ case 142:
+ msg = new CoordinatorAckRequestTxAndQueryEx();
+
+ return msg;
+
+ case 143:
msg = new MvccCounter();
return msg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index c24551b..5d8b77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,12 +54,21 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
@Override public void apply(final GridNearTxFinishFuture fut) {
GridNearTxLocal tx = fut.tx();
+ IgniteInternalFuture<Void> ackFut = null;
+
+ MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
TxMvccInfo mvccInfo = tx.mvccInfo();
- if (mvccInfo != null) {
- IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit(
- mvccInfo.coordinator(), mvccInfo.version());
+ if (qryTracker != null)
+ ackFut = qryTracker.onTxFinish(mvccInfo, fut.context());
+ else if (mvccInfo != null) {
+ ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinator(),
+ mvccInfo.version(),
+ null);
+ }
+ if (ackFut != null) {
ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> ackFut) {
Exception err = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 08f20de..c774f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -237,6 +237,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
trackTimeout = cctx.time().addTimeoutObject(this);
}
+ MvccQueryTracker mvccQueryTracker() {
+ return mvccTracker;
+ }
+
/** {@inheritDoc} */
@Override public boolean near() {
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b89ce73..59eae1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -176,7 +176,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime");
- statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+ statCntrs[2] = new StatCounter("CoordinatorAckRequestTx");
statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime");
statCntrs[4] = new StatCounter("TotalRequests");
statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
@@ -331,20 +331,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) {
assert crd != null;
- long trackCntr = mvccVer.counter();
-
- MvccLongList txs = mvccVer.activeTransactions();
+ long trackCntr = queryTrackCounter(mvccVer);
- if (txs != null) {
- for (int i = 0; i < txs.size(); i++) {
- long txId = txs.get(i);
-
- if (txId < trackCntr)
- trackCntr = txId;
- }
- }
-
- Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) :
+ Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorAckRequestQuery(trackCntr) :
new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr);
try {
@@ -363,6 +352,27 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param mvccVer Read version.
+ * @return
+ */
+ private long queryTrackCounter(MvccCoordinatorVersion mvccVer) {
+ long trackCntr = mvccVer.counter();
+
+ MvccLongList txs = mvccVer.activeTransactions();
+
+ int size = txs.size();
+
+ for (int i = 0; i < size; i++) {
+ long txId = txs.get(i);
+
+ if (txId < trackCntr)
+ trackCntr = txId;
+ }
+
+ return trackCntr;
+ }
+
+ /**
* @param crd Coordinator.
* @return Counter request future.
*/
@@ -422,22 +432,42 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/**
* @param crd Coordinator.
- * @param mvccVer Transaction version.
+ * @param updateVer Transaction update version.
+ * @param readVer Transaction read version.
* @return Acknowledge future.
*/
- public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) {
+ public IgniteInternalFuture<Void> ackTxCommit(UUID crd,
+ MvccCoordinatorVersion updateVer,
+ @Nullable MvccCoordinatorVersion readVer) {
assert crd != null;
- assert mvccVer != null;
+ assert updateVer != null;
WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true);
ackFuts.put(fut.id, fut);
+ MvccCoordinatorMessage msg;
+
+ if (readVer != null) {
+ long trackCntr = queryTrackCounter(readVer);
+
+ if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
+ msg = new CoordinatorAckRequestTxAndQuery(fut.id,
+ updateVer.counter(),
+ trackCntr);
+ }
+ else {
+ msg = new CoordinatorAckRequestTxAndQueryEx(fut.id,
+ updateVer.counter(),
+ readVer.coordinatorVersion(),
+ trackCntr);
+ }
+ }
+ else
+ msg = new CoordinatorAckRequestTx(fut.id, updateVer.counter());
+
try {
- ctx.io().sendToGridTopic(crd,
- MSG_TOPIC,
- new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
- MSG_POLICY);
+ ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
}
catch (IgniteCheckedException e) {
if (ackFuts.remove(fut.id) != null) {
@@ -456,7 +486,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param mvccVer Transaction version.
*/
public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
- CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter());
+ CoordinatorAckRequestTx msg = new CoordinatorAckRequestTx(0, mvccVer.counter());
msg.skipResponse(true);
@@ -568,7 +598,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param nodeId Node ID.
* @param msg Message.
*/
- private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) {
+ private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorAckRequestQuery msg) {
onQueryDone(nodeId, msg.counter());
}
@@ -577,16 +607,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param msg Message.
*/
private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
- prevCrdQueries.onQueryDone(nodeId, msg);
+ prevCrdQueries.onQueryDone(nodeId, msg.coordinatorVersion(), msg.counter());
}
/**
* @param nodeId Sender node ID.
* @param msg Message.
*/
- private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
+ private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) {
onTxDone(msg.txCounter());
+ if (msg.queryCounter() != COUNTER_NA) {
+ if (msg.queryCoordinatorVersion() == 0)
+ onQueryDone(nodeId, msg.queryCounter());
+ else
+ prevCrdQueries.onQueryDone(nodeId, msg.queryCoordinatorVersion(), msg.queryCounter());
+ }
+
if (STAT_CNTRS)
statCntrs[2].update();
@@ -1238,12 +1275,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
if (msg instanceof CoordinatorTxCounterRequest)
processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
- else if (msg instanceof CoordinatorTxAckRequest)
- processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
+ else if (msg instanceof CoordinatorAckRequestTx)
+ processCoordinatorTxAckRequest(nodeId, (CoordinatorAckRequestTx)msg);
else if (msg instanceof CoordinatorFutureResponse)
processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg);
- else if (msg instanceof CoordinatorQueryAckRequest)
- processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg);
+ else if (msg instanceof CoordinatorAckRequestQuery)
+ processCoordinatorQueryAckRequest(nodeId, (CoordinatorAckRequestQuery)msg);
else if (msg instanceof CoordinatorQueryVersionRequest)
processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
else if (msg instanceof MvccCoordinatorVersionResponse)
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
new file mode 100644
index 0000000..e51ec90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestQuery implements MvccCoordinatorMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long cntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestQuery() {
+ // No-op.
+ }
+
+ /**
+ * @param cntr Query counter.
+ */
+ CoordinatorAckRequestQuery(long cntr) {
+ this.cntr = cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Counter.
+ */
+ public long counter() {
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("cntr", cntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cntr = reader.readLong("cntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestQuery.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 134;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestQuery.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
new file mode 100644
index 0000000..a3904fb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTx implements MvccCoordinatorMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
+
+ /** */
+ private long futId;
+
+ /** */
+ private long txCntr;
+
+ /** */
+ private byte flags;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTx() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction.
+ */
+ CoordinatorAckRequestTx(long futId, long txCntr) {
+ this.futId = futId;
+ this.txCntr = txCntr;
+ }
+
+ long queryCounter() {
+ return CacheCoordinatorsProcessor.COUNTER_NA;
+ }
+
+ long queryCoordinatorVersion() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ long futureId() {
+ return futId;
+ }
+
+ /**
+ * @return {@code True} if response message is not needed.
+ */
+ boolean skipResponse() {
+ return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param val {@code True} if response message is not needed.
+ */
+ void skipResponse(boolean val) {
+ if (val)
+ flags |= SKIP_RESPONSE_FLAG_MASK;
+ else
+ flags &= ~SKIP_RESPONSE_FLAG_MASK;
+ }
+
+ /**
+ * @return Counter assigned tp transaction.
+ */
+ public long txCounter() {
+ return txCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("txCntr", txCntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ txCntr = reader.readLong("txCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTx.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 131;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTx.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
new file mode 100644
index 0000000..91f27b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQuery extends CoordinatorAckRequestTx {
+ /** */
+ private long qryCntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTxAndQuery() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction update.
+ * @param qryCntr Counter assigned for transaction reads.
+ */
+ CoordinatorAckRequestTxAndQuery(long futId, long txCntr, long qryCntr) {
+ super(futId, txCntr);
+
+ this.qryCntr = qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCounter() {
+ return qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("qryCntr", qryCntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ qryCntr = reader.readLong("qryCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTxAndQuery.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 141;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTxAndQuery.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
new file mode 100644
index 0000000..1808697
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQueryEx extends CoordinatorAckRequestTx {
+ /** */
+ private long qryCrdVer;
+
+ /** */
+ private long qryCntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTxAndQueryEx() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction update.
+ * @param qryCrdVer Version of coordinator assigned read counter.
+ * @param qryCntr Counter assigned for transaction reads.
+ */
+ CoordinatorAckRequestTxAndQueryEx(long futId, long txCntr, long qryCrdVer, long qryCntr) {
+ super(futId, txCntr);
+
+ this.qryCrdVer = qryCrdVer;
+ this.qryCntr = qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCoordinatorVersion() {
+ return qryCrdVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCounter() {
+ return qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("qryCntr", qryCntr))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeLong("qryCrdVer", qryCrdVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ qryCntr = reader.readLong("qryCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ qryCrdVer = reader.readLong("qryCrdVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTxAndQueryEx.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 142;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTxAndQueryEx.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
deleted file mode 100644
index 602d3b4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorQueryAckRequest implements MvccCoordinatorMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long cntr;
-
- /**
- * Required by {@link GridIoMessageFactory}.
- */
- public CoordinatorQueryAckRequest() {
- // No-op.
- }
-
- /**
- * @param cntr Query counter.
- */
- CoordinatorQueryAckRequest(long cntr) {
- this.cntr = cntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean waitForCoordinatorInit() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean processedFromNioThread() {
- return true;
- }
-
- /**
- * @return Counter.
- */
- public long counter() {
- return cntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeLong("cntr", cntr))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- cntr = reader.readLong("cntr");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(CoordinatorQueryAckRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 134;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 1;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CoordinatorQueryAckRequest.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
deleted file mode 100644
index 14cd6a9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorTxAckRequest implements MvccCoordinatorMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
-
- /** */
- private long futId;
-
- /** */
- private long txCntr;
-
- /** */
- private byte flags;
-
- /**
- * Required by {@link GridIoMessageFactory}.
- */
- public CoordinatorTxAckRequest() {
- // No-op.
- }
-
- /**
- * @param futId Future ID.
- * @param txCntr Counter assigned to transaction.
- */
- CoordinatorTxAckRequest(long futId, long txCntr) {
- this.futId = futId;
- this.txCntr = txCntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean waitForCoordinatorInit() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean processedFromNioThread() {
- return true;
- }
-
- /**
- * @return Future ID.
- */
- long futureId() {
- return futId;
- }
-
- /**
- * @return {@code True} if response message is not needed.
- */
- boolean skipResponse() {
- return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
- }
-
- /**
- * @param val {@code True} if response message is not needed.
- */
- void skipResponse(boolean val) {
- if (val)
- flags |= SKIP_RESPONSE_FLAG_MASK;
- else
- flags &= ~SKIP_RESPONSE_FLAG_MASK;
- }
-
- /**
- * @return Counter assigned tp transaction.
- */
- public long txCounter() {
- return txCntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong("txCntr", txCntr))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- futId = reader.readLong("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- txCntr = reader.readLong("txCntr");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(CoordinatorTxAckRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 131;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 3;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CoordinatorTxAckRequest.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
index bec3301..33407b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -143,7 +143,7 @@ public class MvccCounter implements Message {
/** {@inheritDoc} */
@Override public short directType() {
- return 141;
+ return 143;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 8c421fc..e45b77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteInClosure;
@@ -115,6 +116,42 @@ public class MvccQueryTracker {
cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
}
+ public IgniteInternalFuture<Void> onTxFinish(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx) {
+ MvccCoordinator mvccCrd0 = null;
+ MvccCoordinatorVersion mvccVer0 = null;
+
+ synchronized (this) {
+ if (mvccVer != null) {
+ assert mvccCrd != null;
+
+ mvccCrd0 = mvccCrd;
+ mvccVer0 = mvccVer;
+
+ mvccVer = null; // Mark as finished.
+ }
+ }
+
+ if (mvccVer0 != null) {
+ if (mvccInfo == null) {
+ cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+ return null;
+ }
+ else if (mvccInfo.coordinator().equals(mvccCrd0.nodeId()))
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), mvccVer0);
+ else {
+ cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+ }
+ }
+
+ if (mvccInfo != null)
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+
+ return null;
+ }
+
/**
* @param topVer Topology version.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 700b27d..667865b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -158,23 +158,27 @@ class PreviousCoordinatorQueries {
/**
* @param nodeId Node ID.
- * @param msg Message.
+ * @param crdVer Coordinator version.
+ * @param cntr Counter.
*/
- void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+ void onQueryDone(UUID nodeId, long crdVer, long cntr) {
+ assert crdVer != 0;
+ assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+
synchronized (this) {
- MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter());
+ MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);
Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId);
if (nodeQueries == null)
activeQueries.put(nodeId, nodeQueries = new HashMap<>());
- Integer qryCnt = nodeQueries.get(cntr);
+ Integer qryCnt = nodeQueries.get(mvccCntr);
int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
if (newQryCnt == 0) {
- nodeQueries.remove(cntr);
+ nodeQueries.remove(mvccCntr);
if (nodeQueries.isEmpty()) {
activeQueries.remove(nodeId);
@@ -184,7 +188,7 @@ class PreviousCoordinatorQueries {
}
}
else
- nodeQueries.put(cntr, newQryCnt);
+ nodeQueries.put(mvccCntr, newQryCnt);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 82201ea..8964cd4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -655,7 +655,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
boolean block = true;
@Override public boolean apply(ClusterNode node, Message msg) {
- if (block && msg instanceof CoordinatorTxAckRequest) {
+ if (block && msg instanceof CoordinatorAckRequestTx) {
block = false;
return true;
@@ -991,7 +991,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
clientSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
@Override public void apply(ClusterNode node, Message msg) {
- if (msg instanceof CoordinatorTxAckRequest)
+ if (msg instanceof CoordinatorAckRequestTx)
doSleep(2000);
}
});
@@ -1110,7 +1110,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
private boolean blocked;
@Override public boolean apply(ClusterNode node, Message msg) {
- if (!blocked && (msg instanceof CoordinatorTxAckRequest)) {
+ if (!blocked && (msg instanceof CoordinatorAckRequestTx)) {
blocked = true;
return true;
@@ -2055,7 +2055,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
srvSpi.blockMessages(GridNearGetResponse.class, getTestIgniteInstanceName(1));
- TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorQueryAckRequest.class,
+ TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorAckRequestQuery.class,
getTestIgniteInstanceName(0));
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {