You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2018/04/11 12:28:24 UTC
[02/10] ignite git commit: IGNITE-7871 Implemented additional
synchronization phase for correct partition counters update
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
new file mode 100644
index 0000000..bad1b61
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message is used to send acks for {@link Latch} instances management.
+ */
+public class LatchAckMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Latch id. */
+ private String latchId;
+
+ /** Latch topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Flag indicates that ack is final. */
+ private boolean isFinal;
+
+ /**
+ * Constructor.
+ *
+ * @param latchId Latch id.
+ * @param topVer Latch topology version.
+ * @param isFinal Final acknowledgement flag.
+ */
+ public LatchAckMessage(String latchId, AffinityTopologyVersion topVer, boolean isFinal) {
+ this.latchId = latchId;
+ this.topVer = topVer;
+ this.isFinal = isFinal;
+ }
+
+ /**
+ * Empty constructor for marshalling purposes.
+ */
+ public LatchAckMessage() {
+ }
+
+ /**
+ * @return Latch id.
+ */
+ public String latchId() {
+ return latchId;
+ }
+
+ /**
+ * @return Latch topology version.
+ */
+ public AffinityTopologyVersion topVer() {
+ return topVer;
+ }
+
+ /**
+ * @return {@code} if ack is final.
+ */
+ public boolean isFinal() {
+ return isFinal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeBoolean("isFinal", isFinal))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeString("latchId", latchId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ isFinal = reader.readBoolean("isFinal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ latchId = reader.readString("latchId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(LatchAckMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 135;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 7785605..33f84f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3525,6 +3525,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
U.error(log, "Failed to prepare transaction: " + this, e);
}
+ catch (Throwable t) {
+ fut.onDone(t);
+
+ throw t;
+ }
if (err != null)
fut.rollbackOnError(err);
@@ -3544,6 +3549,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
U.error(log, "Failed to prepare transaction: " + this, e);
}
+ catch (Throwable t) {
+ fut.onDone(t);
+
+ throw t;
+ }
if (err != null)
fut.rollbackOnError(err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5cfd92d..68ec83d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -189,7 +189,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
freeList.saveMetadata();
long updCntr = store.updateCounter();
- int size = store.fullSize();
+ long size = store.fullSize();
long rmvId = globalRemoveId().get();
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
@@ -318,7 +318,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
partMetaId,
updCntr,
rmvId,
- size,
+ (int)size, // TODO: Partition size may be long
cntrsPageId,
state == null ? -1 : (byte)state.ordinal(),
pageCnt
@@ -549,7 +549,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
final int grpId,
final int partId,
final int currAllocatedPageCnt,
- final int partSize
+ final long partSize
) {
if (part != null) {
boolean reserved = part.reserve();
@@ -1301,7 +1301,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public int fullSize() {
+ @Override public long fullSize() {
try {
CacheDataStore delegate0 = init0(true);
@@ -1313,7 +1313,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public int cacheSize(int cacheId) {
+ @Override public long cacheSize(int cacheId) {
try {
CacheDataStore delegate0 = init0(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 9bfaaf3..945ef48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -490,7 +490,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
@Override public AffinityTopologyVersion topologyVersion() {
AffinityTopologyVersion res = topVer;
- if (res.equals(AffinityTopologyVersion.NONE)) {
+ if (res == null || res.equals(AffinityTopologyVersion.NONE)) {
if (system()) {
AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fbdeca1..9fb8777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -545,10 +545,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param topVer Topology version.
* @return Future that will be completed when all ongoing transactions are finished.
*/
- public IgniteInternalFuture<Boolean> finishTxs(AffinityTopologyVersion topVer) {
+ public IgniteInternalFuture<Boolean> finishLocalTxs(AffinityTopologyVersion topVer) {
GridCompoundFuture<IgniteInternalTx, Boolean> res =
new CacheObjectsReleaseFuture<>(
- "Tx",
+ "LocalTx",
topVer,
new IgniteReducer<IgniteInternalTx, Boolean>() {
@Override public boolean collect(IgniteInternalTx e) {
@@ -561,8 +561,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
});
for (IgniteInternalTx tx : txs()) {
- if (needWaitTransaction(tx, topVer))
+ if (needWaitTransaction(tx, topVer)) {
res.add(tx.finishFuture());
+ }
}
res.markInitialized();
@@ -571,6 +572,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Creates a future that will wait for finishing all tx updates on backups after all local transactions are finished.
+ *
+ * NOTE:
+ * As we send finish request to backup nodes after transaction successfully completed on primary node
+ * it's important to ensure that all updates from primary to backup are finished or at least remote transaction has created on backup node.
+ *
+ * @param finishLocalTxsFuture Local transactions finish future.
+ * @param topVer Topology version.
+ * @return Future that will be completed when all ongoing transactions are finished.
+ */
+ public IgniteInternalFuture<?> finishAllTxs(IgniteInternalFuture<?> finishLocalTxsFuture, AffinityTopologyVersion topVer) {
+ final GridCompoundFuture finishAllTxsFuture = new CacheObjectsReleaseFuture("AllTx", topVer);
+
+ // After finishing all local updates, wait for finishing all tx updates on backups.
+ finishLocalTxsFuture.listen(future -> {
+ finishAllTxsFuture.add(cctx.mvcc().finishRemoteTxs(topVer));
+ finishAllTxsFuture.markInitialized();
+ });
+
+ return finishAllTxsFuture;
+ }
+
+ /**
* @param tx Transaction.
* @param topVer Exchange version.
* @return {@code True} if need wait transaction for exchange.
@@ -1834,12 +1858,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Finish future for related remote transactions.
*/
@SuppressWarnings("unchecked")
- public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) {
- GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+ public IgniteInternalFuture<IgniteInternalTx> remoteTxFinishFuture(GridCacheVersion nearVer) {
+ GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> fut = new GridCompoundFuture<>();
for (final IgniteInternalTx tx : txs()) {
if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
- fut.add((IgniteInternalFuture) tx.finishFuture());
+ fut.add(tx.finishFuture());
}
fut.markInitialized();
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
index 7263656..702b188 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
@@ -76,7 +76,7 @@ public class CacheDhtLocalPartitionAfterRemoveSelfTest extends GridCommonAbstrac
cache = grid(g).cache(DEFAULT_CACHE_NAME);
for (GridDhtLocalPartition p : dht(cache).topology().localPartitions()) {
- int size = p.dataStore().fullSize();
+ long size = p.dataStore().fullSize();
assertTrue("Unexpected size: " + size, size <= 32);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 468bbc8..6c570d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
new file mode 100644
index 0000000..52cd033
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.Lists;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Tests for {@link ExchangeLatchManager} functionality when latch coordinator is failed.
+ */
+public class IgniteExchangeLatchManagerCoordinatorFailTest extends GridCommonAbstractTest {
+ /** */
+ private static final String LATCH_NAME = "test";
+
+ /** 5 nodes. */
+ private final AffinityTopologyVersion latchTopVer = new AffinityTopologyVersion(5, 0);
+
+ /** Wait before latch creation. */
+ private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> beforeCreate = (mgr, syncLatch) -> {
+ try {
+ syncLatch.countDown();
+ syncLatch.await();
+
+ Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+ distributedLatch.countDown();
+
+ distributedLatch.await();
+ } catch (Exception e) {
+ log.error("Unexpected exception", e);
+
+ return false;
+ }
+
+ return true;
+ };
+
+ /** Wait before latch count down. */
+ private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> beforeCountDown = (mgr, syncLatch) -> {
+ try {
+ Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+ syncLatch.countDown();
+ syncLatch.await();
+
+ distributedLatch.countDown();
+
+ distributedLatch.await();
+ } catch (Exception e) {
+ log.error("Unexpected exception ", e);
+
+ return false;
+ }
+
+ return true;
+ };
+
+ /** Wait after all operations are successful. */
+ private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> all = (mgr, syncLatch) -> {
+ try {
+ Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+ distributedLatch.countDown();
+
+ syncLatch.countDown();
+
+ distributedLatch.await();
+
+ syncLatch.await();
+ } catch (Exception e) {
+ log.error("Unexpected exception ", e);
+
+ return false;
+ }
+
+ return true;
+ };
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Test scenarios description:
+ *
+ * We have existing coordinator and 4 other nodes.
+ * Each node do following operations:
+ * 1) Create latch
+ * 2) Countdown latch
+ * 3) Await latch
+ *
+ * While nodes do the operations we shutdown coordinator and next oldest node become new coordinator.
+ * We should check that new coordinator properly restored latch and all nodes finished latch completion successfully after that.
+ *
+ * Each node before coordinator shutdown can be in 3 different states:
+ *
+ * State {@link #beforeCreate} - Node didn't create latch yet.
+ * State {@link #beforeCountDown} - Node created latch but didn't count down it yet.
+ * State {@link #all} - Node created latch and count downed it.
+ *
+ * We should check important cases when future coordinator is in one of these states, and other 3 nodes have 3 different states.
+ */
+
+ /**
+ * Scenario 1:
+ *
+ * Node 1 state -> {@link #beforeCreate}
+ * Node 2 state -> {@link #beforeCountDown}
+ * Node 3 state -> {@link #all}
+ * Node 4 state -> {@link #beforeCreate}
+ */
+ public void testCoordinatorFail1() throws Exception {
+ List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+ beforeCreate,
+ beforeCountDown,
+ all,
+ beforeCreate
+ );
+
+ doTestCoordinatorFail(nodeStates);
+ }
+
+ /**
+ * Scenario 2:
+ *
+ * Node 1 state -> {@link #beforeCountDown}
+ * Node 2 state -> {@link #beforeCountDown}
+ * Node 3 state -> {@link #all}
+ * Node 4 state -> {@link #beforeCreate}
+ */
+ public void testCoordinatorFail2() throws Exception {
+ List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+ beforeCountDown,
+ beforeCountDown,
+ all,
+ beforeCreate
+ );
+
+ doTestCoordinatorFail(nodeStates);
+ }
+
+ /**
+ * Scenario 3:
+ *
+ * Node 1 state -> {@link #all}
+ * Node 2 state -> {@link #beforeCountDown}
+ * Node 3 state -> {@link #all}
+ * Node 4 state -> {@link #beforeCreate}
+ */
+ public void testCoordinatorFail3() throws Exception {
+ List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+ all,
+ beforeCountDown,
+ all,
+ beforeCreate
+ );
+
+ doTestCoordinatorFail(nodeStates);
+ }
+
+ /**
+ * Test latch coordinator fail with specified scenarios.
+ *
+ * @param nodeScenarios Node scenarios.
+ * @throws Exception If failed.
+ */
+ private void doTestCoordinatorFail(List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeScenarios) throws Exception {
+ IgniteEx crd = (IgniteEx) startGridsMultiThreaded(5);
+ crd.cluster().active(true);
+
+ // Latch to synchronize node states.
+ CountDownLatch syncLatch = new CountDownLatch(5);
+
+ GridCompoundFuture finishAllLatches = new GridCompoundFuture();
+
+ AtomicBoolean hasErrors = new AtomicBoolean();
+
+ for (int node = 1; node < 5; node++) {
+ IgniteEx grid = grid(node);
+ ExchangeLatchManager latchMgr = grid.context().cache().context().exchange().latch();
+ final int stateIdx = node - 1;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+ boolean success = nodeScenarios.get(stateIdx).apply(latchMgr, syncLatch);
+ if (!success)
+ hasErrors.set(true);
+ }, 1, "latch-runner-" + node);
+
+ finishAllLatches.add(fut);
+ }
+
+ finishAllLatches.markInitialized();
+
+ // Wait while all nodes reaches their states.
+ while (syncLatch.getCount() != 1) {
+ Thread.sleep(10);
+
+ if (hasErrors.get())
+ throw new Exception("All nodes should complete latches without errors");
+ }
+
+ crd.close();
+
+ // Resume progress for all nodes.
+ syncLatch.countDown();
+
+ // Wait for distributed latch completion.
+ finishAllLatches.get(5000);
+
+ Assert.assertFalse("All nodes should complete latches without errors", hasErrors.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
new file mode 100644
index 0000000..63d772a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ *
+ */
+public class GridCachePartitionsStateValidationTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** */
+ private boolean clientMode;
+
+ /** {@inheritDoc */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ );
+
+ cfg.setCommunicationSpi(new SingleMessageInterceptorCommunicationSpi(2));
+
+ if (clientMode)
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ clientMode = false;
+ }
+
+ /**
+ * Test that partitions state validation works correctly.
+ *
+ * @throws Exception If failed.
+ */
+ public void testValidationIfPartitionCountersAreInconsistent() throws Exception {
+ IgniteEx ignite = (IgniteEx) startGrids(2);
+ ignite.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ // Modify update counter for some partition.
+ for (GridDhtLocalPartition partition : ignite.cachex(CACHE_NAME).context().topology().localPartitions()) {
+ partition.updateCounter(100500L);
+ break;
+ }
+
+ // Trigger exchange.
+ startGrid(2);
+
+ awaitPartitionMapExchange();
+
+ // Nothing should happen (just log error message) and we're still able to put data to corrupted cache.
+ ignite.cache(CACHE_NAME).put(0, 0);
+
+ stopAllGrids();
+ }
+
+ /**
+ * Test that all nodes send correct {@link GridDhtPartitionsSingleMessage} with consistent update counters.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPartitionCountersConsistencyOnExchange() throws Exception {
+ IgniteEx ignite = (IgniteEx) startGrids(4);
+ ignite.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ final String atomicCacheName = "atomic-cache";
+ final String txCacheName = "tx-cache";
+
+ clientMode = true;
+
+ Ignite client = startGrid(4);
+
+ clientMode = false;
+
+ IgniteCache atomicCache = client.getOrCreateCache(new CacheConfiguration<>(atomicCacheName)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setBackups(2)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ );
+
+ IgniteCache txCache = client.getOrCreateCache(new CacheConfiguration<>(txCacheName)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setBackups(2)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ );
+
+ for (int it = 0; it < 10; it++) {
+ SingleMessageInterceptorCommunicationSpi spi = (SingleMessageInterceptorCommunicationSpi) ignite.configuration().getCommunicationSpi();
+ spi.clear();
+
+ // Stop load future.
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ // Run atomic load.
+ IgniteInternalFuture atomicLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> {
+ int k = 0;
+
+ while (!stop.get()) {
+ k++;
+ try {
+ atomicCache.put(k, k);
+ } catch (Exception ignored) {}
+ }
+ }, 1, "atomic-load");
+
+ // Run tx load.
+ IgniteInternalFuture txLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> {
+ final int txOps = 5;
+
+ while (!stop.get()) {
+ List<Integer> randomKeys = Stream.generate(() -> ThreadLocalRandom.current().nextInt(5))
+ .limit(txOps)
+ .sorted()
+ .collect(Collectors.toList());
+
+ try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED)) {
+ for (Integer key : randomKeys)
+ txCache.put(key, key);
+
+ tx.commit();
+ }
+ catch (Exception ignored) { }
+ }
+ }, 4, "tx-load");
+
+ // Wait for some data.
+ Thread.sleep(1000);
+
+ // Prevent sending full message.
+ spi.blockFullMessage();
+
+ // Trigger exchange.
+ IgniteInternalFuture nodeStopFuture = GridTestUtils.runAsync(() -> stopGrid(3));
+
+ try {
+ spi.waitUntilAllSingleMessagesAreSent();
+
+ List<GridDhtPartitionsSingleMessage> interceptedMessages = spi.getMessages();
+
+ // Associate each message with existing node UUID.
+ Map<UUID, GridDhtPartitionsSingleMessage> messagesMap = new HashMap<>();
+ for (int i = 0; i < interceptedMessages.size(); i++)
+ messagesMap.put(grid(i + 1).context().localNodeId(), interceptedMessages.get(i));
+
+ GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(ignite.context().cache().context());
+
+ // Validate partition update counters. If counters are not consistent, exception will be thrown.
+ validator.validatePartitionsUpdateCounters(ignite.cachex(atomicCacheName).context().topology(), messagesMap, Collections.emptySet());
+ validator.validatePartitionsUpdateCounters(ignite.cachex(txCacheName).context().topology(), messagesMap, Collections.emptySet());
+
+ } finally {
+ // Stop load and resume exchange.
+ spi.unblockFullMessage();
+
+ stop.set(true);
+
+ atomicLoadFuture.get();
+ txLoadFuture.get();
+ nodeStopFuture.get();
+ }
+
+ // Return grid to initial state.
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+ }
+ }
+
+ /**
+ * SPI which intercepts single messages during exchange.
+ */
+ private static class SingleMessageInterceptorCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private static final List<GridDhtPartitionsSingleMessage> messages = new CopyOnWriteArrayList<>();
+
+ /** Future completes when {@link #singleMessagesThreshold} messages are sent to coordinator. */
+ private static final GridFutureAdapter allSingleMessagesSent = new GridFutureAdapter();
+
+ /** A number of single messages we're waiting for send. */
+ private final int singleMessagesThreshold;
+
+ /** Latch which blocks full message sending. */
+ private volatile CountDownLatch blockFullMsgLatch;
+
+ /**
+ * Constructor.
+ */
+ private SingleMessageInterceptorCommunicationSpi(int singleMessagesThreshold) {
+ this.singleMessagesThreshold = singleMessagesThreshold;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) ((GridIoMessage) msg).message();
+
+ // We're interesting for only exchange messages and when node is stopped.
+ if (singleMsg.exchangeId() != null && singleMsg.exchangeId().isLeft() && !singleMsg.client()) {
+ messages.add(singleMsg);
+
+ if (messages.size() == singleMessagesThreshold)
+ allSingleMessagesSent.onDone();
+ }
+ }
+
+ try {
+ if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsFullMessage) {
+ if (blockFullMsgLatch != null)
+ blockFullMsgLatch.await();
+ }
+ }
+ catch (Exception ignored) { }
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /** */
+ public void clear() {
+ messages.clear();
+ allSingleMessagesSent.reset();
+ }
+
+ /** */
+ public List<GridDhtPartitionsSingleMessage> getMessages() {
+ return Collections.unmodifiableList(messages);
+ }
+
+ /** */
+ public void blockFullMessage() {
+ blockFullMsgLatch = new CountDownLatch(1);
+ }
+
+ /** */
+ public void unblockFullMessage() {
+ blockFullMsgLatch.countDown();
+ }
+
+ /** */
+ public void waitUntilAllSingleMessagesAreSent() throws IgniteCheckedException {
+ allSingleMessagesSent.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
new file mode 100644
index 0000000..9ed8d54
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+/**
+ * Test correct behaviour of {@link GridDhtPartitionsStateValidator} class.
+ */
+public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstractTest {
+ /** Mocks and stubs. */
+ private final UUID localNodeId = UUID.randomUUID();
+ /** */
+ private GridCacheSharedContext cctxMock;
+ /** */
+ private GridDhtPartitionTopology topologyMock;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ // Prepare mocks.
+ cctxMock = Mockito.mock(GridCacheSharedContext.class);
+ Mockito.when(cctxMock.localNodeId()).thenReturn(localNodeId);
+
+ topologyMock = Mockito.mock(GridDhtPartitionTopology.class);
+ Mockito.when(topologyMock.partitionState(Matchers.any(), Matchers.anyInt())).thenReturn(GridDhtPartitionState.OWNING);
+ Mockito.when(topologyMock.groupId()).thenReturn(0);
+ Mockito.when(topologyMock.partitions()).thenReturn(3);
+
+ List<GridDhtLocalPartition> localPartitions = Lists.newArrayList(
+ partitionMock(0, 1, 1),
+ partitionMock(1, 2, 2),
+ partitionMock(2, 3, 3)
+ );
+ Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
+ Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
+ }
+
+ /**
+ * @return Partition mock with specified {@code id}, {@code updateCounter} and {@code size}.
+ */
+ private GridDhtLocalPartition partitionMock(int id, long updateCounter, long size) {
+ GridDhtLocalPartition partitionMock = Mockito.mock(GridDhtLocalPartition.class);
+ Mockito.when(partitionMock.id()).thenReturn(id);
+ Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter);
+ Mockito.when(partitionMock.fullSize()).thenReturn(size);
+ return partitionMock;
+ }
+
+ /**
+ * @return Message containing specified {@code countersMap}.
+ */
+ private GridDhtPartitionsSingleMessage fromUpdateCounters(Map<Integer, T2<Long, Long>> countersMap) {
+ GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+ msg.addPartitionUpdateCounters(0, countersMap);
+ return msg;
+ }
+
+ /**
+ * @return Message containing specified {@code sizesMap}.
+ */
+ private GridDhtPartitionsSingleMessage fromCacheSizes(Map<Integer, Long> sizesMap) {
+ GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+ msg.addPartitionSizes(0, sizesMap);
+ return msg;
+ }
+
+ /**
+ * Test partition update counters validation.
+ */
+ public void testPartitionCountersValidation() {
+ UUID remoteNode = UUID.randomUUID();
+ UUID ignoreNode = UUID.randomUUID();
+
+ // For partitions 0 and 2 (zero counter) we have inconsistent update counters.
+ Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>();
+ updateCountersMap.put(0, new T2<>(2L, 2L));
+ updateCountersMap.put(1, new T2<>(2L, 2L));
+
+ // Form single messages map.
+ Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+ messages.put(remoteNode, fromUpdateCounters(updateCountersMap));
+ messages.put(ignoreNode, fromUpdateCounters(updateCountersMap));
+
+ GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock);
+
+ // (partId, (nodeId, updateCounter))
+ Map<Integer, Map<UUID, Long>> result = validator.validatePartitionsUpdateCounters(topologyMock, messages, Sets.newHashSet(ignoreNode));
+
+ // Check that validation result contains all necessary information.
+ Assert.assertEquals(2, result.size());
+ Assert.assertTrue(result.containsKey(0));
+ Assert.assertTrue(result.containsKey(2));
+ Assert.assertTrue(result.get(0).get(localNodeId) == 1L);
+ Assert.assertTrue(result.get(0).get(remoteNode) == 2L);
+ Assert.assertTrue(result.get(2).get(localNodeId) == 3L);
+ Assert.assertTrue(result.get(2).get(remoteNode) == 0L);
+ }
+
+ /**
+ * Test partition cache sizes validation.
+ */
+ public void testPartitionCacheSizesValidation() {
+ UUID remoteNode = UUID.randomUUID();
+ UUID ignoreNode = UUID.randomUUID();
+
+ // For partitions 0 and 2 we have inconsistent cache sizes.
+ Map<Integer, Long> cacheSizesMap = new HashMap<>();
+ cacheSizesMap.put(0, 2L);
+ cacheSizesMap.put(1, 2L);
+ cacheSizesMap.put(2, 2L);
+
+ // Form single messages map.
+ Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+ messages.put(remoteNode, fromCacheSizes(cacheSizesMap));
+ messages.put(ignoreNode, fromCacheSizes(cacheSizesMap));
+
+ GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock);
+
+ // (partId, (nodeId, cacheSize))
+ Map<Integer, Map<UUID, Long>> result = validator.validatePartitionsSizes(topologyMock, messages, Sets.newHashSet(ignoreNode));
+
+ // Check that validation result contains all necessary information.
+ Assert.assertEquals(2, result.size());
+ Assert.assertTrue(result.containsKey(0));
+ Assert.assertTrue(result.containsKey(2));
+ Assert.assertTrue(result.get(0).get(localNodeId) == 1L);
+ Assert.assertTrue(result.get(0).get(remoteNode) == 2L);
+ Assert.assertTrue(result.get(2).get(localNodeId) == 3L);
+ Assert.assertTrue(result.get(2).get(remoteNode) == 2L);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
new file mode 100644
index 0000000..03ea0f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.T1;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class TxOptimisticOnPartitionExchangeTest extends GridCommonAbstractTest {
+ /** Nodes count. */
+ private static final int NODES_CNT = 3;
+
+ /** Tx size. */
+ private static final int TX_SIZE = 20 * NODES_CNT;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** Logger started. */
+ private static volatile boolean msgInterception;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODES_CNT);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi(log()));
+
+ cfg.setCacheConfiguration(defaultCacheConfiguration()
+ .setName(CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setCacheMode(PARTITIONED)
+ .setBackups(1));
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConsistencyOnPartitionExchange() throws Exception {
+ doTest(SERIALIZABLE, true);
+ doTest(READ_COMMITTED, true);
+ doTest(SERIALIZABLE, false);
+ doTest(READ_COMMITTED, false);
+ }
+
+ /**
+ * @param isolation {@link TransactionIsolation}.
+ * @param txInitiatorPrimary False If the transaction does not use the keys of the node that initiated it.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void doTest(final TransactionIsolation isolation, boolean txInitiatorPrimary) throws Exception {
+ final CountDownLatch txStarted = new CountDownLatch(1);
+
+ final IgniteCache cache = ignite(0).cache(CACHE_NAME);
+
+ final Map<Integer, Integer> txValues = new TreeMap<>();
+
+ ClusterNode node = ignite(0).cluster().node();
+
+ GridCacheAffinityManager affinity = ((IgniteCacheProxy)cache).context().affinity();
+
+ for (int i = 0; txValues.size() < TX_SIZE; i++) {
+ if (!txInitiatorPrimary && node.equals(affinity.primaryByKey(i, NONE)))
+ continue;
+
+ txValues.put(i, i);
+ }
+
+ TestCommunicationSpi.init();
+
+ msgInterception = true;
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() {
+ try (Transaction tx = ignite(0).transactions().txStart(OPTIMISTIC, isolation)) {
+ info(">>> TX started.");
+
+ txStarted.countDown();
+
+ cache.putAll(txValues);
+
+ tx.commit();
+
+ info(">>> TX committed.");
+ }
+
+ return null;
+ }
+ });
+
+ txStarted.await();
+
+ try {
+ info(">>> Grid starting.");
+
+ IgniteEx ignite = startGrid(NODES_CNT);
+
+ info(">>> Grid started.");
+
+ fut.get();
+
+ awaitPartitionMapExchange();
+
+ msgInterception = false;
+
+ IgniteCache<Object, Object> cacheStartedNode = ignite.cache(CACHE_NAME);
+
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Set<Object> keys = cacheStartedNode.getAll(txValues.keySet()).keySet();
+
+ assertEquals(txValues.keySet(), new TreeSet<>(keys));
+
+ tx.commit();
+ }
+ }
+ finally {
+ msgInterception = false;
+
+ stopGrid(NODES_CNT);
+ }
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("ConstantConditions")
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Partition single message sent from added node. */
+ private static volatile CountDownLatch partSingleMsgSentFromAddedNode;
+
+ /** Partition supply message sent count. */
+ private static final AtomicInteger partSupplyMsgSentCnt = new AtomicInteger();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /**
+ * @param log Logger.
+ */
+ public TestCommunicationSpi(IgniteLogger log) {
+ this.log = log;
+ }
+
+ /**
+ *
+ */
+ public static void init() {
+ partSingleMsgSentFromAddedNode = new CountDownLatch(1);
+
+ partSupplyMsgSentCnt.set(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(
+ final ClusterNode node,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC
+ ) throws IgniteSpiException {
+ if (msgInterception) {
+ if (msg instanceof GridIoMessage) {
+ final Message msg0 = ((GridIoMessage)msg).message();
+
+ String locNodeId = ((IgniteEx)ignite).context().localNodeId().toString();
+
+ int nodeIdx = Integer.parseInt(locNodeId.substring(locNodeId.length() - 3));
+
+ if (nodeIdx == 0) {
+ if (msg0 instanceof GridNearTxPrepareRequest || msg0 instanceof GridDhtTxPrepareRequest) {
+ GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ partSingleMsgSentFromAddedNode.await();
+
+ sendMessage(node, msg, ackC, true);
+
+ return null;
+ }
+ });
+
+ return;
+
+ }
+ else if (msg0 instanceof GridNearTxFinishRequest || msg0 instanceof GridDhtTxFinishRequest) {
+ GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ final T1<Integer> i = new T1<>(0);
+
+ while (waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return partSupplyMsgSentCnt.get() > i.get();
+ }
+ }, i.get() == 0 ? 5_000 : 500))
+ i.set(partSupplyMsgSentCnt.get());
+
+ sendMessage(node, msg, ackC, true);
+
+ return null;
+ }
+ });
+
+ return;
+ }
+ }
+ else if (nodeIdx == NODES_CNT && msg0 instanceof GridDhtPartitionsSingleMessage)
+ partSingleMsgSentFromAddedNode.countDown();
+
+ if (msg0 instanceof GridDhtPartitionSupplyMessage)
+ partSupplyMsgSentCnt.incrementAndGet();
+ }
+ }
+
+ sendMessage(node, msg, ackC, msgInterception);
+ }
+
+ /**
+ * @param node Node.
+ * @param msg Message.
+ * @param ackC Ack closure.
+ * @param logMsg Log Messages.
+ */
+ private void sendMessage(
+ final ClusterNode node,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC,
+ boolean logMsg
+ ) throws IgniteSpiException {
+ if (logMsg) {
+ String id = node.id().toString();
+ String locNodeId = ((IgniteEx)ignite).context().localNodeId().toString();
+
+ Message msg0 = ((GridIoMessage)msg).message();
+
+ log.info(
+ String.format(">>> Output msg[type=%s, fromNode= %s, toNode=%s]",
+ msg0.getClass().getSimpleName(),
+ locNodeId.charAt(locNodeId.length() - 1),
+ id.charAt(id.length() - 1)
+ )
+ );
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb397f7..0612615 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -133,6 +133,8 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCrossCacheT
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearCacheSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxExceptionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheGlobalLoadTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidationTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGetStoreErrorSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxExceptionSelfTest;
@@ -292,6 +294,8 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class);
suite.addTestSuite(CacheDeferredDeleteSanitySelfTest.class);
suite.addTestSuite(CacheDeferredDeleteQueueTest.class);
+ suite.addTestSuite(GridCachePartitionsStateValidatorSelfTest.class);
+ suite.addTestSuite(GridCachePartitionsStateValidationTest.class);
suite.addTest(IgniteCacheTcpClientDiscoveryTestSuite.suite());
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index f8add30..415479d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessim
import org.apache.ignite.internal.processors.cache.WalModeChangeAdvancedSelfTest;
import org.apache.ignite.internal.processors.cache.WalModeChangeCoordinatorNotAffinityNodeSelfTest;
import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticT
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
@@ -93,6 +95,10 @@ public class IgniteCacheTestSuite6 extends TestSuite {
suite.addTestSuite(PartitionedTransactionalOptimisticCacheGetsDistributionTest.class);
suite.addTestSuite(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class);
+ suite.addTestSuite(TxOptimisticOnPartitionExchangeTest.class);
+
+ suite.addTestSuite(IgniteExchangeLatchManagerCoordinatorFailTest.class);
+
return suite;
}
}