You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/16 11:50:47 UTC
[3/5] ignite git commit: ignite-3478 Support for optimistic
transactions
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
new file mode 100644
index 0000000..9f5e0b8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface MvccCoordinatorChangeAware {
+ /**
+ * @param newCrd New coordinator.
+ * @return Version used by this query.
+ */
+ @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java
new file mode 100644
index 0000000..2d4e97b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+public interface MvccCoordinatorFuture {
+ /**
+ * @return Coordinator node ID.
+ */
+ public UUID coordinatorNodeId();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index d80e43c..5b2e69e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -42,4 +42,9 @@ public interface MvccCoordinatorVersion extends Message {
* @return Counter.
*/
public long counter();
+
+ /**
+ * @return Version without active transactions.
+ */
+ public MvccCoordinatorVersion withoutActiveTransactions();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index c037226..b6a4b1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -46,7 +46,7 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
private int txsCnt;
/** */
- private long[] txs; // TODO IGNITE-3478 (do not send on backups?)
+ private long[] txs;
/** */
private long cleanupVer;
@@ -63,7 +63,7 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
* @param cntr Counter.
* @param cleanupVer Cleanup version.
*/
- public MvccCoordinatorVersionResponse(long crdVer, long cntr, long cleanupVer) {
+ MvccCoordinatorVersionResponse(long crdVer, long cntr, long cleanupVer) {
this.crdVer = crdVer;
this.cntr = cntr;
this.cleanupVer = cleanupVer;
@@ -154,6 +154,14 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
}
/** {@inheritDoc} */
+ @Override public MvccCoordinatorVersion withoutActiveTransactions() {
+ if (txsCnt > 0)
+ return new MvccCoordinatorVersionWithoutTxs(crdVer, cntr, cleanupVer);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public long coordinatorVersion() {
return crdVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java
new file mode 100644
index 0000000..f4a7378
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccCoordinatorVersionWithoutTxs implements MvccCoordinatorVersion {
+ /** */
+ private long crdVer;
+
+ /** */
+ private long cntr;
+
+ /** */
+ private long cleanupVer;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public MvccCoordinatorVersionWithoutTxs() {
+ // No-op.
+ }
+
+ /**
+ * @param crdVer Coordinator version.
+ * @param cntr Counter.
+ * @param cleanupVer Cleanup version.
+ */
+ public MvccCoordinatorVersionWithoutTxs(long crdVer, long cntr, long cleanupVer) {
+ this.crdVer = crdVer;
+ this.cntr = cntr;
+ this.cleanupVer = cleanupVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MvccLongList activeTransactions() {
+ return MvccEmptyLongList.INSTANCE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long coordinatorVersion() {
+ return crdVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long cleanupVersion() {
+ return cleanupVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long counter() {
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MvccCoordinatorVersion withoutActiveTransactions() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("cleanupVer", cleanupVer))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("cntr", cntr))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("crdVer", crdVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cleanupVer = reader.readLong("cleanupVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ cntr = reader.readLong("cntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ crdVer = reader.readLong("crdVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(MvccCoordinatorVersionWithoutTxs.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 145;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccCoordinatorVersionWithoutTxs.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
index bec3301..d2fac94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
*/
public class MvccCounter implements Message {
/** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
private long crdVer;
/** */
@@ -143,7 +146,7 @@ public class MvccCounter implements Message {
/** {@inheritDoc} */
@Override public short directType() {
- return 141;
+ return 143;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java
new file mode 100644
index 0000000..7963685
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class MvccEmptyLongList implements MvccLongList {
+ /** */
+ public static MvccEmptyLongList INSTANCE = new MvccEmptyLongList();
+
+ /**
+ *
+ */
+ private MvccEmptyLongList() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long get(int i) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(long val) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "MvccEmptyLongList[]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
deleted file mode 100644
index d5172c6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public interface MvccQueryAware {
- /**
- * @param newCrd New coordinator.
- * @return Version used by this query.
- */
- @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd);
-
- /**
- * @param topVer Topology version when version was requested.
- */
- public void onMvccVersionReceived(AffinityTopologyVersion topVer);
-
- /**
- * @param e Error.
- */
- public void onMvccVersionError(IgniteCheckedException e);
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 360af4c..ad933d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -23,15 +23,17 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
/**
* TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop
*/
-public class MvccQueryTracker {
+public class MvccQueryTracker implements MvccCoordinatorChangeAware {
/** */
private MvccCoordinator mvccCrd;
@@ -47,14 +49,17 @@ public class MvccQueryTracker {
/** */
@GridToStringExclude
- private final MvccQueryAware lsnr;
+ private final IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException> lsnr;
/**
* @param cctx Cache context.
* @param canRemap {@code True} if can wait for topology changes.
* @param lsnr Listener.
*/
- public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr) {
+ public MvccQueryTracker(GridCacheContext cctx,
+ boolean canRemap,
+ IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException> lsnr)
+ {
assert cctx.mvccEnabled() : cctx.name();
this.cctx = cctx;
@@ -115,13 +120,53 @@ public class MvccQueryTracker {
}
/**
+ * @param mvccInfo Mvcc update info.
+ * @param ctx Context.
+ * @param commit If {@code true} ack commit, otherwise rollback.
+ * @return Commit ack future.
+ */
+ public IgniteInternalFuture<Void> onTxDone(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx, boolean commit) {
+ MvccCoordinator mvccCrd0 = null;
+ MvccCoordinatorVersion mvccVer0 = null;
+
+ synchronized (this) {
+ if (mvccVer != null) {
+ assert mvccCrd != null;
+
+ mvccCrd0 = mvccCrd;
+ mvccVer0 = mvccVer;
+
+ mvccVer = null; // Mark as finished.
+ }
+ }
+
+ assert mvccVer0 == null || mvccInfo == null || mvccInfo.coordinatorNodeId().equals(mvccCrd0.nodeId());
+
+ if (mvccVer0 != null || mvccInfo != null) {
+ if (mvccInfo == null) {
+ cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+ return null;
+ }
+ else {
+ if (commit)
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0);
+ else
+ ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0);
+ }
+ }
+
+ return null;
+ }
+
+ /**
* @param topVer Topology version.
*/
public void requestVersion(final AffinityTopologyVersion topVer) {
MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer);
if (mvccCrd0 == null) {
- lsnr.onMvccVersionError(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
+ lsnr.apply(null, CacheCoordinatorsProcessor.noCoordinatorError(topVer));
return;
}
@@ -136,7 +181,7 @@ public class MvccQueryTracker {
assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0;
if (!canRemap) {
- lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed."));
+ lsnr.apply(null, new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed."));
return;
}
@@ -147,6 +192,7 @@ public class MvccQueryTracker {
}
}
+ // TODO IGNITE-3478: get rid of future creation in 'requestQueryCounter'.
IgniteInternalFuture<MvccCoordinatorVersion> cntrFut =
cctx.shared().coordinators().requestQueryCounter(mvccCrd0);
@@ -172,7 +218,7 @@ public class MvccQueryTracker {
}
if (!needRemap) {
- lsnr.onMvccVersionReceived(topVer);
+ lsnr.apply(topVer, null);
return;
}
@@ -184,7 +230,7 @@ public class MvccQueryTracker {
log.debug("Mvcc coordinator failed, need remap: " + e);
}
catch (IgniteCheckedException e) {
- lsnr.onMvccVersionError(e);
+ lsnr.apply(null, e);
return;
}
@@ -193,7 +239,7 @@ public class MvccQueryTracker {
if (canRemap)
waitNextTopology(topVer);
else {
- lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to " +
+ lsnr.apply(null, new ClusterTopologyCheckedException("Failed to " +
"request mvcc version, coordinator failed."));
}
}
@@ -218,7 +264,7 @@ public class MvccQueryTracker {
requestVersion(fut.get());
}
catch (IgniteCheckedException e) {
- lsnr.onMvccVersionError(e);
+ lsnr.apply(null, e);
}
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 700b27d..5c56f40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -51,11 +51,11 @@ class PreviousCoordinatorQueries {
private boolean initDone;
/**
- * @param srvNodesQueries Active queries started on server nodes.
+ * @param nodeQueries Active queries map.
* @param discoCache Discovery data.
* @param mgr Discovery manager.
*/
- void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
+ void init(Map<UUID, Map<MvccCounter, Integer>> nodeQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
synchronized (this) {
assert !initDone;
assert waitNodes == null;
@@ -63,14 +63,16 @@ class PreviousCoordinatorQueries {
waitNodes = new HashSet<>();
for (ClusterNode node : discoCache.allNodes()) {
- if (CU.clientNode(node) && mgr.alive(node) && !F.contains(rcvd, node.id()))
+ if ((nodeQueries == null || !nodeQueries.containsKey(node.id())) &&
+ mgr.alive(node) &&
+ !F.contains(rcvd, node.id()))
waitNodes.add(node.id());
}
initDone = waitNodes.isEmpty();
- if (srvNodesQueries != null) {
- for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : srvNodesQueries.entrySet())
+ if (nodeQueries != null) {
+ for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : nodeQueries.entrySet())
addAwaitedActiveQueries(e.getKey(), e.getValue());
}
@@ -123,7 +125,7 @@ class PreviousCoordinatorQueries {
* @param nodeId Node ID.
* @param nodeQueries Active queries started on node.
*/
- void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) {
+ void addNodeActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) {
synchronized (this) {
if (initDone)
return;
@@ -158,23 +160,27 @@ class PreviousCoordinatorQueries {
/**
* @param nodeId Node ID.
- * @param msg Message.
+ * @param crdVer Coordinator version.
+ * @param cntr Counter.
*/
- void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+ void onQueryDone(UUID nodeId, long crdVer, long cntr) {
+ assert crdVer != 0;
+ assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+
synchronized (this) {
- MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter());
+ MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);
Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId);
if (nodeQueries == null)
activeQueries.put(nodeId, nodeQueries = new HashMap<>());
- Integer qryCnt = nodeQueries.get(cntr);
+ Integer qryCnt = nodeQueries.get(mvccCntr);
int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
if (newQryCnt == 0) {
- nodeQueries.remove(cntr);
+ nodeQueries.remove(mvccCntr);
if (nodeQueries.isEmpty()) {
activeQueries.remove(nodeId);
@@ -184,7 +190,7 @@ class PreviousCoordinatorQueries {
}
}
else
- nodeQueries.put(cntr, newQryCnt);
+ nodeQueries.put(mvccCntr, newQryCnt);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
index 428d707..96a9864 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
@@ -29,6 +29,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
*/
public class TxMvccInfo implements Message {
/** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
private UUID crd;
/** */
@@ -42,8 +45,8 @@ public class TxMvccInfo implements Message {
}
/**
- * @param crd
- * @param mvccVer
+ * @param crd Coordinator node ID.
+ * @param mvccVer Mvcc version.
*/
public TxMvccInfo(UUID crd, MvccCoordinatorVersion mvccVer) {
assert crd != null;
@@ -53,10 +56,28 @@ public class TxMvccInfo implements Message {
this.mvccVer = mvccVer;
}
- public UUID coordinator() {
+ /**
+ * @return Instance with version without active transactions.
+ */
+ public TxMvccInfo withoutActiveTransactions() {
+ MvccCoordinatorVersion mvccVer0 = mvccVer.withoutActiveTransactions();
+
+ if (mvccVer0 == mvccVer)
+ return this;
+
+ return new TxMvccInfo(crd, mvccVer0);
+ }
+
+ /**
+ * @return Coordinator node ID.
+ */
+ public UUID coordinatorNodeId() {
return crd;
}
+ /**
+ * @return Mvcc version.
+ */
public MvccCoordinatorVersion version() {
return mvccVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index e5a9736..5fc38ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1261,12 +1261,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
KeyCacheObject key,
@Nullable CacheObject val,
GridCacheVersion ver,
+ long expireTime,
MvccCoordinatorVersion mvccVer)
throws IgniteCheckedException
{
CacheDataStore delegate = init0(false);
- return delegate.mvccInitialValue(cctx, key, val, ver, mvccVer);
+ return delegate.mvccInitialValue(cctx, key, val, ver, expireTime, mvccVer);
}
/** {@inheritDoc} */
@@ -1276,10 +1277,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
+ long expireTime,
MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- return delegate.mvccUpdate(cctx, primary, key, val, ver, mvccVer);
+ return delegate.mvccUpdate(cctx, primary, key, val, ver, expireTime, mvccVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b0cfa2d..5db0d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -381,6 +382,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
return mvccInfo;
}
+ /**
+ * @return Mvcc version for update operation, should be always initialized if mvcc is enabled.
+ */
+ @Nullable protected final MvccCoordinatorVersion mvccVersionForUpdate() {
+ assert !txState().mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized: " + this;
+
+ return mvccInfo != null ? mvccInfo.version() : null;
+ }
+
/** {@inheritDoc} */
@Override public void mvccInfo(TxMvccInfo mvccInfo) {
this.mvccInfo = mvccInfo;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d8f911c..4321ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -520,8 +520,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
try {
cctx.tm().txContext(this);
- assert !txState.mvccEnabled(cctx) || mvccInfo != null;
-
AffinityTopologyVersion topVer = topologyVersion();
/*
@@ -700,7 +698,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
resolveTaskName(),
dhtVer,
null,
- mvccInfo != null ? mvccInfo.version() : null);
+ mvccVersionForUpdate());
if (updRes.success()) {
txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -733,7 +731,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
resolveTaskName(),
dhtVer,
null,
- mvccInfo != null ? mvccInfo.version() : null);
+ mvccVersionForUpdate());
}
}
else if (op == DELETE) {
@@ -755,7 +753,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
resolveTaskName(),
dhtVer,
null,
- mvccInfo != null ? mvccInfo.version() : null);
+ mvccVersionForUpdate());
if (updRes.success()) {
txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -784,7 +782,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
resolveTaskName(),
dhtVer,
null,
- mvccInfo != null ? mvccInfo.version() : null);
+ mvccVersionForUpdate());
}
}
else if (op == RELOAD) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index fc82cbb..31aa2ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -81,6 +81,9 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx);
long mvccCntr = getMvccCounter(pageAddr, idx);
+ assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
+ assert mvccCntr != COUNTER_NA;
+
return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
hash,
link,
@@ -122,15 +125,15 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
if (storeMvccVersion()) {
long mvccTopVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx);
- long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
+ long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
- assert mvccTopVer > 0 : mvccTopVer;
- assert mvcCntr != COUNTER_NA;
+ assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
+ assert mvccCntr != COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccTopVer);
off += 8;
- PageUtils.putLong(dstPageAddr, off, mvcCntr);
+ PageUtils.putLong(dstPageAddr, off, mvccCntr);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index c956d22..47d8a6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -99,7 +99,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccCoordinatorVersion(srcPageAddr, srcIdx);
long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
- assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
+ assert unmaskCoordinatorVersion(mvccUpdateTopVer) > 0 : mvccUpdateCntr;
assert mvccUpdateCntr != COUNTER_NA;
PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 6309153..e8861bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -157,7 +157,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
cmp = compareKeys(row.key(), link);
if (cmp != 0 || !grp.mvccEnabled())
- return 0;
+ return cmp;
long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
index af11a9d..2785186 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -37,9 +37,10 @@ public class MvccRemoveRow extends MvccUpdateRow {
public MvccRemoveRow(
KeyCacheObject key,
MvccCoordinatorVersion mvccVer,
+ boolean needOld,
int part,
int cacheId) {
- super(key, null, null, mvccVer, part, cacheId);
+ super(key, null, null, 0L, mvccVer, needOld, part, cacheId);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index 137ca28..fb2a6cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
/**
*
@@ -54,6 +56,12 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
/** */
private final MvccCoordinatorVersion mvccVer;
+ /** */
+ private final boolean needOld;
+
+ /** */
+ private CacheDataRow oldRow;
+
/**
* @param key Key.
* @param val Value.
@@ -66,12 +74,22 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
+ long expireTime,
MvccCoordinatorVersion mvccVer,
+ boolean needOld,
int part,
int cacheId) {
- super(key, val, ver, part, 0L, cacheId);
+ super(key, val, ver, part, expireTime, cacheId);
this.mvccVer = mvccVer;
+ this.needOld = needOld;
+ }
+
+ /**
+ * @return Old row.
+ */
+ public CacheDataRow oldRow() {
+ return oldRow;
}
/**
@@ -110,7 +128,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
if (cmp == 0)
cmp = Long.compare(mvccVer.counter(), rowCntr);
- // Can be equals if backup rebalanced value updated on primary.
+ // Can be equals if execute update on backup and backup already rebalanced value updated on primary.
assert cmp >= 0 : "[updCrd=" + unmaskedCoordinatorVersion() +
", updCntr=" + mvccVer.counter() +
", rowCrd=" + rowCrdVer +
@@ -148,9 +166,18 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
if (cmp == 0)
res = UpdateResult.VERSION_FOUND;
- else
- res = CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked) ?
+ else {
+ if (versionForRemovedValue(rowCrdVerMasked))
+ res = UpdateResult.PREV_NULL;
+ else {
+ res = UpdateResult.PREV_NOT_NULL;
+
+ if (needOld)
+ oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+ }
+ res = versionForRemovedValue(rowCrdVerMasked) ?
UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
+ }
}
// Suppose transactions on previous coordinator versions are done.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 30145ab..e6300a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -88,7 +88,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.dr.GridDrType;
@@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Version which is less then any version generated on coordinator. */
private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
- new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L);
+ new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L);
/** Cache receiver. */
private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 859010e..58da451 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -187,8 +187,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
* @throws InterruptedException If interrupted.
*/
public void waitForBlocked() throws InterruptedException {
+ waitForBlocked(1);
+ }
+
+ /**
+ * @param size Number of messages to wait for.
+ * @throws InterruptedException If interrupted.
+ */
+ public void waitForBlocked(int size) throws InterruptedException {
synchronized (this) {
- while (blockedMsgs.isEmpty())
+ while (blockedMsgs.size() < size)
wait();
}
}