You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/08/29 16:50:43 UTC
[03/50] [abbrv] ignite git commit: IGNITE-8920 - Fixed transaction
hanging on Runtime exceptions during commit. - Unified StorageException and
PersistentStorageIOException. - BPlus tree behavior is overridable in tests.
- Refactored AccountTransferAmount
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java b/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
deleted file mode 100644
index 8d7cf15..0000000
--- a/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.failure;
-
-import javax.management.MBeanServer;
-import javax.management.MBeanServerInvocationHandler;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
-import org.apache.ignite.mxbean.WorkersControlMXBean;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.Transaction;
-import org.jetbrains.annotations.NotNull;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
-/**
- * Test transfer amount between accounts with enabled {@link StopNodeFailureHandler}.
- */
-public class AccountTransferTransactionTest extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
- /** Count of accounts in one thread. */
- private static final int ACCOUNTS_CNT = 20;
- /** Count of threads and caches. */
- private static final int THREADS_CNT = 20;
- /** Count of nodes to start. */
- private static final int NODES_CNT = 3;
- /** Count of transaction on cache. */
- private static final int TRANSACTION_CNT = 10;
-
- /** {@inheritDoc} */
- @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
- return new StopNodeFailureHandler();
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
- final IgniteConfiguration cfg = super.getConfiguration(name);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
- cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
- cfg.setLocalHost("127.0.0.1");
- cfg.setDataStorageConfiguration(new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
- .setMaxSize(50 * 1024 * 1024)
- .setPersistenceEnabled(true))
- );
-
- CacheConfiguration[] cacheConfigurations = new CacheConfiguration[THREADS_CNT];
- for (int i = 0; i < THREADS_CNT; i++) {
- cacheConfigurations[i] = new CacheConfiguration()
- .setName(cacheName(i))
- .setAffinity(new RendezvousAffinityFunction(false, 32))
- .setBackups(1)
- .setAtomicityMode(TRANSACTIONAL)
- .setCacheMode(CacheMode.PARTITIONED)
- .setWriteSynchronizationMode(FULL_SYNC)
- .setEvictionPolicy(new FifoEvictionPolicy(1000))
- .setOnheapCacheEnabled(true);
- }
-
- cfg.setCacheConfiguration(cacheConfigurations);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /**
- * Test transfer amount.
- */
- public void testTransferAmount() throws Exception {
- //given: started some nodes with client.
- startGrids(NODES_CNT);
-
- IgniteEx igniteClient = startGrid(getClientConfiguration(NODES_CNT));
-
- igniteClient.cluster().active(true);
-
- Random random = new Random();
-
- long[] initAmount = new long[THREADS_CNT];
-
- //and: fill all accounts on all caches and calculate total amount for every cache.
- for (int cachePrefixIdx = 0; cachePrefixIdx < THREADS_CNT; cachePrefixIdx++) {
- IgniteCache<Object, Object> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx));
-
- try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- for (int accountId = 0; accountId < ACCOUNTS_CNT; accountId++) {
- Long amount = (long)random.nextInt(1000);
-
- cache.put(accountId, amount);
-
- initAmount[cachePrefixIdx] += amount;
- }
-
- tx.commit();
- }
- }
-
- //when: start transfer amount from account to account in different threads.
- CountDownLatch firstTransactionDone = new CountDownLatch(THREADS_CNT);
-
- ArrayList<Thread> transferThreads = new ArrayList<>();
-
- for (int i = 0; i < THREADS_CNT; i++) {
- transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i)));
-
- transferThreads.get(i).start();
- }
-
- firstTransactionDone.await(10, TimeUnit.SECONDS);
-
- //and: terminate disco-event-worker thread on one node.
- WorkersControlMXBean bean = workersMXBean(1);
-
- bean.terminateWorker(
- bean.getWorkerNames().stream()
- .filter(name -> name.startsWith("disco-event-worker"))
- .findFirst()
- .orElse(null)
- );
-
- for (Thread thread : transferThreads) {
- thread.join();
- }
-
- long[] resultAmount = new long[THREADS_CNT];
-
- //then: calculate total amount for every thread.
- for (int j = 0; j < THREADS_CNT; j++) {
- String cacheName = cacheName(j);
-
- IgniteCache<Object, Object> cache = igniteClient.getOrCreateCache(cacheName);
-
- try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-
- for (int i = 0; i < ACCOUNTS_CNT; i++)
- resultAmount[j] += getNotNullValue(cache, i);
- tx.commit();
- }
-
- long diffAmount = initAmount[j] - resultAmount[j];
-
- //and: check that result amount equal to init amount.
- assertTrue(
- String.format("Total amount before and after transfer is not same: diff=%s, cache=%s",
- diffAmount, cacheName),
- diffAmount == 0
- );
- }
- }
-
- /**
- * Make test cache name by prefix.
- */
- @NotNull private String cacheName(int cachePrefixIdx) {
- return "cache" + cachePrefixIdx;
- }
-
- /**
- * Ignite configuration for client.
- */
- @NotNull private IgniteConfiguration getClientConfiguration(int nodesPrefix) throws Exception {
- IgniteConfiguration clientConf = getConfiguration(getTestIgniteInstanceName(nodesPrefix));
-
- clientConf.setClientMode(true);
-
- return clientConf;
- }
-
- /**
- * Extract not null value from cache.
- */
- private long getNotNullValue(IgniteCache<Object, Object> cache, int i) {
- Object value = cache.get(i);
-
- return value == null ? 0 : ((Long)value);
- }
-
- /**
- * Configure workers mx bean.
- */
- private WorkersControlMXBean workersMXBean(int igniteInt) throws Exception {
- ObjectName mbeanName = U.makeMBeanName(
- getTestIgniteInstanceName(igniteInt),
- "Kernal",
- WorkersControlMXBeanImpl.class.getSimpleName()
- );
-
- MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
-
- if (!mbeanSrv.isRegistered(mbeanName))
- fail("MBean is not registered: " + mbeanName.getCanonicalName());
-
- return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, WorkersControlMXBean.class, true);
- }
-
- /**
- *
- */
- private static class TransferAmountTxThread extends Thread {
- /** */
- private CountDownLatch firstTransactionLatch;
- /** */
- private Ignite ignite;
- /** */
- private String cacheName;
- /** */
- private Random random = new Random();
-
- /**
- * @param ignite Ignite.
- */
- private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final Ignite ignite, String cacheName) {
- this.firstTransactionLatch = firstTransactionLatch;
- this.ignite = ignite;
- this.cacheName = cacheName;
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- for (int i = 0; i < TRANSACTION_CNT; i++) {
- try {
- updateInTransaction(ignite.cache(cacheName));
- }
- finally {
- if (i == 0)
- firstTransactionLatch.countDown();
- }
- }
- }
-
- /**
- * @throws IgniteException if fails
- */
- @SuppressWarnings("unchecked")
- private void updateInTransaction(IgniteCache cache) throws IgniteException {
- int accIdFrom = random.nextInt(ACCOUNTS_CNT);
- int accIdTo = random.nextInt(ACCOUNTS_CNT);
-
- if (accIdFrom == accIdTo)
- accIdTo = (int)getNextAccountId(accIdFrom);
-
- Long acctFrom;
- Long acctTo;
-
- try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- acctFrom = (Long)cache.get(accIdFrom);
- acctTo = (Long)cache.get(accIdTo);
-
- long transactionAmount = (long)(random.nextDouble() * acctFrom);
-
- cache.put(accIdFrom, acctFrom - transactionAmount);
- cache.put(accIdTo, acctTo + transactionAmount);
-
- tx.commit();
- }
- }
-
- /**
- * @param curr current
- * @return random value
- */
- private long getNextAccountId(long curr) {
- long randomVal;
-
- do {
- randomVal = random.nextInt(ACCOUNTS_CNT);
- }
- while (curr == randomVal);
-
- return randomVal;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index fc2a7d6..40025f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
index 379b8c3..5a1a6fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.TestFailureHandler;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.pagemem.wal.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index ecc7b03..3d62fe1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.pagemem.wal.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
new file mode 100644
index 0000000..fe27e6e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.NotNull;
+import org.jsr166.ConcurrentLinkedHashMap;
+import org.junit.Assert;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test transfer amount between accounts with enabled {@link StopNodeFailureHandler}.
+ *
+ * This test can be extended to emulate failover scenarios during transactional operations on the grid.
+ */
+public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Count of accounts in one thread. */
+ private static final int DFLT_ACCOUNTS_CNT = 32;
+
+ /** Count of threads and caches. */
+ private static final int DFLT_TX_THREADS_CNT = 20;
+
+ /** Count of nodes to start. */
+ private static final int DFLT_NODES_CNT = 3;
+
+ /** Count of transaction on cache. */
+ private static final int DFLT_TRANSACTIONS_CNT = 10;
+
+ /** Completed transactions map. */
+ private ConcurrentLinkedHashMap[] completedTxs;
+
+ /**
+ *
+ */
+ protected int nodesCount() {
+ return DFLT_NODES_CNT;
+ }
+
+ /**
+ *
+ */
+ protected int accountsCount() {
+ return DFLT_ACCOUNTS_CNT;
+ }
+
+ /**
+ *
+ */
+ protected int transactionsCount() {
+ return DFLT_TRANSACTIONS_CNT;
+ }
+
+ /**
+ *
+ */
+ protected int txThreadsCount() {
+ return DFLT_TX_THREADS_CNT;
+ }
+
+ /**
+ * @return Flag enables secondary index on account caches.
+ */
+ protected boolean indexed() {
+ return false;
+ }
+
+ /**
+ * @return Flag enables persistence on account caches.
+ */
+ protected boolean persistent() {
+ return true;
+ }
+
+ /**
+ * @return Flag enables cross-node transactions,
+ * when primary partitions participating in transaction spreaded across several cluster nodes.
+ */
+ protected boolean crossNodeTransactions() {
+ // Commit error during cross node transactions breaks transaction integrity
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-9086
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new StopNodeFailureHandler();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setConsistentId(name);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+ cfg.setLocalHost("127.0.0.1");
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(256 * 1024 * 1024)
+ .setPersistenceEnabled(persistent()))
+ );
+
+ CacheConfiguration[] cacheConfigurations = new CacheConfiguration[txThreadsCount()];
+
+ for (int i = 0; i < txThreadsCount(); i++) {
+ CacheConfiguration ccfg = new CacheConfiguration()
+ .setName(cacheName(i))
+ .setAffinity(new RendezvousAffinityFunction(false, accountsCount()))
+ .setBackups(1)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setReadFromBackup(true)
+ .setOnheapCacheEnabled(true);
+
+ if (indexed())
+ ccfg.setIndexedTypes(IgniteUuid.class, AccountState.class);
+
+ cacheConfigurations[i] = ccfg;
+ }
+
+ cfg.setCacheConfiguration(cacheConfigurations);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Make test cache name by prefix.
+ */
+ @NotNull private String cacheName(int cachePrefixIdx) {
+ return "cache" + cachePrefixIdx;
+ }
+
+ /**
+ * Ignite configuration for client.
+ */
+ @NotNull private IgniteConfiguration getClientConfiguration(int nodesPrefix) throws Exception {
+ IgniteConfiguration clientConf = getConfiguration(getTestIgniteInstanceName(nodesPrefix));
+
+ clientConf.setClientMode(true);
+
+ return clientConf;
+ }
+
+ /**
+ * Test transfer amount.
+ */
+ public void doTestTransferAmount(FailoverScenario failoverScenario) throws Exception {
+ failoverScenario.beforeNodesStarted();
+
+ //given: started some nodes with client.
+ startGrids(nodesCount());
+
+ IgniteEx igniteClient = startGrid(getClientConfiguration(nodesCount()));
+
+ igniteClient.cluster().active(true);
+
+ int[] initAmount = new int[txThreadsCount()];
+ completedTxs = new ConcurrentLinkedHashMap[txThreadsCount()];
+
+ //and: fill all accounts on all caches and calculate total amount for every cache.
+ for (int cachePrefixIdx = 0; cachePrefixIdx < txThreadsCount(); cachePrefixIdx++) {
+ IgniteCache<Integer, AccountState> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx));
+
+ AtomicInteger coinsCounter = new AtomicInteger();
+
+ try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int accountId = 0; accountId < accountsCount(); accountId++) {
+ Set<Integer> initialAmount = generateCoins(coinsCounter, 5);
+
+ cache.put(accountId, new AccountState(accountId, tx.xid(), initialAmount));
+ }
+
+ tx.commit();
+ }
+
+ initAmount[cachePrefixIdx] = coinsCounter.get();
+ completedTxs[cachePrefixIdx] = new ConcurrentLinkedHashMap();
+ }
+
+ //when: start transfer amount from account to account in different threads.
+ CountDownLatch firstTransactionDone = new CountDownLatch(txThreadsCount());
+
+ ArrayList<Thread> transferThreads = new ArrayList<>();
+
+ for (int i = 0; i < txThreadsCount(); i++) {
+ transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i), i));
+
+ transferThreads.get(i).start();
+ }
+
+ firstTransactionDone.await(10, TimeUnit.SECONDS);
+
+ failoverScenario.afterFirstTransaction();
+
+ for (Thread thread : transferThreads) {
+ thread.join();
+ }
+
+ failoverScenario.afterTransactionsFinished();
+
+ consistencyCheck(initAmount);
+ }
+
+ /**
+ * Calculates total amount of coins for every thread for every node and checks that coins difference is zero (transaction integrity is saved).
+ */
+ private void consistencyCheck(int[] initAmount) {
+ for (Ignite node : G.allGrids()) {
+ for (int j = 0; j < txThreadsCount(); j++) {
+ List<Integer> totalCoins = new ArrayList<>();
+
+ String cacheName = cacheName(j);
+
+ IgniteCache<Integer, AccountState> cache = node.getOrCreateCache(cacheName);
+
+ AccountState[] accStates = new AccountState[accountsCount()];
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int i = 0; i < accountsCount(); i++) {
+ AccountState state = cache.get(i);
+
+ Assert.assertNotNull("Account state has lost [node=" + node.name() + ", cache=" + cacheName + ", accNo=" + i + "]", state);
+
+ totalCoins.addAll(state.coins);
+
+ accStates[i] = state;
+ }
+
+ tx.commit();
+ }
+
+ Collections.sort(totalCoins);
+
+ if (initAmount[j] != totalCoins.size()) {
+ Set<Integer> lostCoins = new HashSet<>();
+ Set<Integer> duplicateCoins = new HashSet<>();
+
+ for (int coin = 1; coin <= initAmount[j]; coin++)
+ if (!totalCoins.contains(coin))
+ lostCoins.add(coin);
+
+ for (int coinIdx = 1; coinIdx < totalCoins.size(); coinIdx++)
+ if (totalCoins.get(coinIdx).equals(totalCoins.get(coinIdx - 1)))
+ duplicateCoins.add(totalCoins.get(coinIdx));
+
+ log.error("Transaction integrity failed for [node=" + node.name() + ", cache=" + cacheName + "]");
+
+ log.error(String.format("Total amount of coins before and after transfers are not same. Lost coins: %s. Duplicate coins: %s.",
+ Objects.toString(lostCoins),
+ Objects.toString(duplicateCoins)));
+
+ ConcurrentLinkedHashMap<IgniteUuid, TxState> txs = completedTxs[j];
+
+ for (TxState tx : txs.values())
+ log.error("Tx: " + tx);
+
+ for (int i = 0; i < accountsCount(); i++)
+ log.error("Account state " + i + " = " + accStates[i]);
+
+ assertFalse("Test failed. See messages above", true);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public static class AccountState {
+ /** Account id. */
+ private final int accId;
+
+ /** Last performed transaction id on account state. */
+ @QuerySqlField(index = true)
+ private final IgniteUuid txId;
+
+ /** Set of coins holds in account. */
+ private final Set<Integer> coins;
+
+ /**
+ * @param accId Acc id.
+ * @param txId Tx id.
+ * @param coins Coins.
+ */
+ public AccountState(int accId, IgniteUuid txId, Set<Integer> coins) {
+ this.txId = txId;
+ this.coins = Collections.unmodifiableSet(coins);
+ this.accId = accId;
+ }
+
+ /**
+ * @param random Randomizer.
+ * @return Set of coins need to transfer from.
+ */
+ public Set<Integer> coinsToTransfer(Random random) {
+ int coinsNum = random.nextInt(coins.size());
+
+ return coins.stream().limit(coinsNum).collect(Collectors.toSet());
+ }
+
+ /**
+ * @param txId Transaction id.
+ * @param coinsToAdd Coins to add to current account.
+ * @return Account state with added coins.
+ */
+ public AccountState addCoins(IgniteUuid txId, Set<Integer> coinsToAdd) {
+ return new AccountState(accId, txId, Sets.union(coins, coinsToAdd).immutableCopy());
+ }
+
+ /**
+ * @param txId Transaction id.
+ * @param coinsToRemove Coins to remove from current account.
+ * @return Account state with removed coins.
+ */
+ public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRemove) {
+ return new AccountState(accId, txId, Sets.difference(coins, coinsToRemove).immutableCopy());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AccountState that = (AccountState) o;
+ return Objects.equals(txId, that.txId) &&
+ Objects.equals(coins, that.coins);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(txId, coins);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "AccountState{" +
+ "accId=" + Objects.toString(accId) +
+ ", coins=" + Objects.toString(coins) +
+ '}';
+ }
+ }
+
+ /**
+ * @param coinsNum Coins number.
+ */
+ private Set<Integer> generateCoins(AtomicInteger coinsCounter, int coinsNum) {
+ Set<Integer> res = new HashSet<>();
+
+ for (int i = 0; i < coinsNum; i++)
+ res.add(coinsCounter.incrementAndGet());
+
+ return res;
+ }
+
+ /**
+ * State representing transaction between two accounts.
+ */
+ static class TxState {
+ /**
+ * Account states before transaction.
+ */
+ AccountState before1, before2;
+
+ /**
+ * Account states after transaction.
+ */
+ AccountState after1, after2;
+
+ /**
+ * Transferred coins between accounts during this transaction.
+ */
+ Set<Integer> transferredCoins;
+
+ /**
+ * @param before1 Before 1.
+ * @param before2 Before 2.
+ * @param after1 After 1.
+ * @param after2 After 2.
+ * @param transferredCoins Transferred coins.
+ */
+ public TxState(AccountState before1, AccountState before2, AccountState after1, AccountState after2, Set<Integer> transferredCoins) {
+ this.before1 = before1;
+ this.before2 = before2;
+ this.after1 = after1;
+ this.after2 = after2;
+ this.transferredCoins = transferredCoins;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TxState{" +
+ "before1=" + before1 +
+ ", before2=" + before2 +
+ ", transferredCoins=" + transferredCoins +
+ ", after1=" + after1 +
+ ", after2=" + after2 +
+ '}';
+ }
+ }
+
+ /**
+ *
+ */
+ private class TransferAmountTxThread extends Thread {
+ /** */
+ private CountDownLatch firstTransactionLatch;
+ /** */
+ private IgniteEx ignite;
+ /** */
+ private String cacheName;
+ /** */
+ private int txIndex;
+ /** */
+ private Random random = new Random();
+
+ /**
+ * @param ignite Ignite.
+ */
+ private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final IgniteEx ignite, String cacheName, int txIndex) {
+ this.firstTransactionLatch = firstTransactionLatch;
+ this.ignite = ignite;
+ this.cacheName = cacheName;
+ this.txIndex = txIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ for (int i = 0; i < transactionsCount(); i++) {
+ try {
+ updateInTransaction(ignite.cache(cacheName));
+ }
+ finally {
+ if (i == 0)
+ firstTransactionLatch.countDown();
+ }
+ }
+ }
+
+ /**
+ * @throws IgniteException if fails
+ */
+ @SuppressWarnings("unchecked")
+ private void updateInTransaction(IgniteCache<Integer, AccountState> cache) throws IgniteException {
+ int accIdFrom;
+ int accIdTo;
+
+ for (;;) {
+ accIdFrom = random.nextInt(accountsCount());
+ accIdTo = random.nextInt(accountsCount());
+
+ if (accIdFrom == accIdTo)
+ continue;
+
+ ClusterNode primaryForAccFrom = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdFrom);
+ ClusterNode primaryForAccTo = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdTo);
+
+ // Allows only transaction between accounts that primary on the same node if corresponding flag is enabled.
+ if (!crossNodeTransactions() && !primaryForAccFrom.id().equals(primaryForAccTo.id()))
+ continue;
+
+ break;
+ }
+
+ AccountState acctFrom;
+ AccountState acctTo;
+
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ acctFrom = cache.get(accIdFrom);
+ acctTo = cache.get(accIdTo);
+
+ Set<Integer> coinsToTransfer = acctFrom.coinsToTransfer(random);
+
+ AccountState nextFrom = acctFrom.removeCoins(tx.xid(), coinsToTransfer);
+ AccountState nextTo = acctTo.addCoins(tx.xid(), coinsToTransfer);
+
+ cache.put(accIdFrom, nextFrom);
+ cache.put(accIdTo, nextTo);
+
+ tx.commit();
+
+ completedTxs[txIndex].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer));
+ }
+ }
+
+ /**
+ * @param curr current
+ * @return random value
+ */
+ private long getNextAccountId(long curr) {
+ long randomVal;
+
+ do {
+ randomVal = random.nextInt(accountsCount());
+ }
+ while (curr == randomVal);
+
+ return randomVal;
+ }
+ }
+
+ /**
+ * Interface to implement custom failover scenario during transactional amount transfer.
+ */
+ public interface FailoverScenario {
+ /**
+ * Callback before nodes have started.
+ */
+ public default void beforeNodesStarted() throws Exception { }
+
+ /**
+ * Callback when first transaction has finished.
+ */
+ public default void afterFirstTransaction() throws Exception { }
+
+ /**
+ * Callback when all transactions have finished.
+ */
+ public default void afterTransactionsFinished() throws Exception { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
new file mode 100644
index 0000000..3260607
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.processors.cache.tree.SearchRow;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test cases that check transaction data integrity after transaction commit failed.
+ */
+public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends AbstractTransactionIntergrityTest {
+ /** Corruption enabled flag. */
+ private static volatile boolean corruptionEnabled;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ corruptionEnabled = false;
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60 * 1000L;
+ }
+
+ /**
+ * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent.
+ */
+ public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode1() throws Exception {
+ doTestTransferAmount(new IndexCorruptionFailoverScenario(
+ true,
+ (hnd, tree) -> hnd instanceof BPlusTree.Search,
+ failoverPredicate(true, () -> new AssertionError("Test")))
+ );
+ }
+
+ /**
+ * Throws a test {@link RuntimeException} during tx commit from {@link BPlusTree} and checks after that data is consistent.
+ */
+ public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode2() throws Exception {
+ doTestTransferAmount(new IndexCorruptionFailoverScenario(
+ true,
+ (hnd, tree) -> hnd instanceof BPlusTree.Search,
+ failoverPredicate(true, () -> new RuntimeException("Test")))
+ );
+ }
+
+ /**
+ * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent.
+ */
+ public void testPrimaryIndexCorruptionDuringCommitOnBackupNode() throws Exception {
+ doTestTransferAmount(new IndexCorruptionFailoverScenario(
+ true,
+ (hnd, tree) -> hnd instanceof BPlusTree.Search,
+ failoverPredicate(false, () -> new AssertionError("Test")))
+ );
+ }
+
+ /**
+ * Throws a test {@link IgniteCheckedException} during tx commit from {@link BPlusTree} and checks after that data is consistent.
+ */
+ public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode3() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9082");
+
+ doTestTransferAmount(new IndexCorruptionFailoverScenario(
+ false,
+ (hnd, tree) -> hnd instanceof BPlusTree.Search,
+ failoverPredicate(true, () -> new IgniteCheckedException("Test")))
+ );
+ }
+
+ /**
+ * Creates failover predicate which generates error during transaction commmit.
+ *
+ * @param failOnPrimary If {@code true} index should be failed on transaction primary node.
+ * @param errorSupplier Supplier to create various errors.
+ */
+ private BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate(
+ boolean failOnPrimary,
+ Supplier<Throwable> errorSupplier
+ ) {
+ return (ignite, row) -> {
+ int cacheId = row.cacheId();
+ int partId = row.key().partition();
+
+ final ClusterNode locNode = ignite.localNode();
+ final AffinityTopologyVersion curTopVer = ignite.context().discovery().topologyVersionEx();
+
+ // Throw exception if current node is primary for given row.
+ return ignite.cachesx(c -> c.context().cacheId() == cacheId)
+ .stream()
+ .filter(c -> c.context().affinity().primaryByPartition(locNode, partId, curTopVer) == failOnPrimary)
+ .map(c -> errorSupplier.get())
+ .findFirst()
+ .orElse(null);
+ };
+ }
+
+ /**
+ * Index corruption failover scenario.
+ */
+ class IndexCorruptionFailoverScenario implements FailoverScenario {
+ /** Failed node index. */
+ static final int failedNodeIdx = 1;
+
+ /** Is node stopping expected after failover. */
+ private final boolean nodeStoppingExpected;
+
+ /** Predicate that will choose an instance of {@link BPlusTree} and page operation
+ * to make further failover in this tree using {@link #failoverPredicate}. */
+ private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate;
+
+ /** Function that may return error during row insertion into {@link BPlusTree}. */
+ private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate;
+
+ /**
+ * @param nodeStoppingExpected Node stopping expected.
+ * @param treeCorruptionPredicate Tree corruption predicate.
+ * @param failoverPredicate Failover predicate.
+ */
+ IndexCorruptionFailoverScenario(
+ boolean nodeStoppingExpected,
+ BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate,
+ BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate
+ ) {
+ this.nodeStoppingExpected = nodeStoppingExpected;
+ this.treeCorruptionPredicate = treeCorruptionPredicate;
+ this.failoverPredicate = failoverPredicate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeNodesStarted() {
+ BPlusTree.pageHndWrapper = (tree, hnd) -> {
+ final IgniteEx locIgnite = (IgniteEx) Ignition.localIgnite();
+
+ if (!locIgnite.name().endsWith(String.valueOf(failedNodeIdx)))
+ return hnd;
+
+ if (treeCorruptionPredicate.apply(hnd, tree)) {
+ log.info("Created corrupted tree handler for -> " + hnd + " " + tree);
+
+ PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>) hnd;
+
+ return new PageHandler<BPlusTree.Get, BPlusTree.Result>() {
+ @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io, Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException {
+ log.info("Invoked " + " " + cacheId + " " + arg.toString() + " for BTree (" + corruptionEnabled + ") -> " + arg.row() + " / " + arg.row().getClass());
+
+ if (corruptionEnabled && (arg.row() instanceof SearchRow)) {
+ SearchRow row = (SearchRow) arg.row();
+
+ // Store cacheId to search row explicitly, as it can be zero if there is one cache in a group.
+ Throwable res = failoverPredicate.apply(locIgnite, new SearchRow(cacheId, row.key()));
+
+ if (res != null) {
+ if (res instanceof Error)
+ throw (Error) res;
+ else if (res instanceof RuntimeException)
+ throw (RuntimeException) res;
+ else if (res instanceof IgniteCheckedException)
+ throw (IgniteCheckedException) res;
+ }
+ }
+
+ return delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, lvl);
+ }
+
+ @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr, BPlusTree.Get g, int lvl) {
+ return g.canRelease(pageId, lvl);
+ }
+ };
+ }
+
+ return hnd;
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterFirstTransaction() {
+ // Enable BPlus tree corruption after first transactions have finished.
+ corruptionEnabled = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterTransactionsFinished() throws Exception {
+ // Disable index corruption.
+ BPlusTree.pageHndWrapper = (tree, hnd) -> hnd;
+
+ if (nodeStoppingExpected) {
+ // Wait until node with corrupted index will left cluster.
+ GridTestUtils.waitForCondition(() -> {
+ try {
+ grid(failedNodeIdx);
+ }
+ catch (IgniteIllegalStateException e) {
+ return true;
+ }
+
+ return false;
+ }, getTestTimeout());
+
+ // Failed node should be stopped.
+ GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, "");
+
+ // Re-start failed node.
+ startGrid(failedNodeIdx);
+
+ awaitPartitionMapExchange();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
new file mode 100644
index 0000000..25aae4b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
+import org.apache.ignite.mxbean.WorkersControlMXBean;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ *
+ */
+public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTransactionIntergrityTest {
+ /** {@inheritDoc}. */
+ @Override protected long getTestTimeout() {
+ return 60 * 1000L;
+ }
+
+ /** {@inheritDoc}. */
+ @Override protected boolean persistent() {
+ return false;
+ }
+
+ /**
+ *
+ */
+ public void testFailoverWithDiscoWorkerTermination() throws Exception {
+ doTestTransferAmount(new FailoverScenario() {
+ static final int failedNodeIdx = 1;
+
+ /** {@inheritDoc}. */
+ @Override public void afterFirstTransaction() throws Exception {
+ // Terminate disco-event-worker thread on one node.
+ WorkersControlMXBean bean = workersMXBean(failedNodeIdx);
+
+ bean.terminateWorker(
+ bean.getWorkerNames().stream()
+ .filter(name -> name.startsWith("disco-event-worker"))
+ .findFirst()
+ .orElse(null)
+ );
+ }
+
+ /** {@inheritDoc}. */
+ @Override public void afterTransactionsFinished() throws Exception {
+ // Wait until node with death worker will left cluster.
+ GridTestUtils.waitForCondition(() -> {
+ try {
+ grid(failedNodeIdx);
+ }
+ catch (IgniteIllegalStateException e) {
+ return true;
+ }
+
+ return false;
+ }, getTestTimeout());
+
+ // Failed node should be stopped.
+ GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, "");
+
+ // Re-start failed node.
+ startGrid(failedNodeIdx);
+
+ awaitPartitionMapExchange();
+ }
+ });
+ }
+
+ /**
+ * Configure workers mx bean.
+ */
+ private WorkersControlMXBean workersMXBean(int igniteInt) throws Exception {
+ ObjectName mbeanName = U.makeMBeanName(
+ getTestIgniteInstanceName(igniteInt),
+ "Kernal",
+ WorkersControlMXBeanImpl.class.getSimpleName()
+ );
+
+ MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
+
+ if (!mbeanSrv.isRegistered(mbeanName))
+ fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, WorkersControlMXBean.class, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 74e23ed..ac2bed3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -17,6 +17,7 @@
package org.apache.ignite.testsuites;
+import java.util.Set;
import junit.framework.TestSuite;
import org.apache.ignite.GridSuppressedExceptionSelfTest;
import org.apache.ignite.failure.FailureHandlerTriggeredTest;
@@ -55,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestor
import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExchangeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest;
import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithSystemWorkerDeathTest;
import org.apache.ignite.internal.processors.closure.GridClosureProcessorRemoteTest;
import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest;
import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest;
@@ -90,8 +92,6 @@ import org.apache.ignite.testframework.test.VariationsIteratorTest;
import org.apache.ignite.util.AttributeNodeFilterSelfTest;
import org.jetbrains.annotations.Nullable;
-import java.util.Set;
-
/**
* Basic test suite.
*/
@@ -211,6 +211,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTestSuite(StopNodeFailureHandlerTest.class);
suite.addTestSuite(StopNodeOrHaltFailureHandlerTest.class);
suite.addTestSuite(OomFailureHandlerTest.class);
+ suite.addTestSuite(TransactionIntegrityWithSystemWorkerDeathTest.class);
suite.addTestSuite(AtomicOperationsInTxTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 5f1d18d..3171754 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -17,8 +17,8 @@
package org.apache.ignite.testsuites;
+import java.util.Set;
import junit.framework.TestSuite;
-import org.apache.ignite.failure.AccountTransferTransactionTest;
import org.apache.ignite.failure.IoomFailureHandlerTest;
import org.apache.ignite.failure.SystemWorkersTerminationTest;
import org.apache.ignite.internal.ClusterBaselineNodesMetricsSelfTest;
@@ -29,8 +29,6 @@ import org.apache.ignite.util.GridCommandHandlerTest;
import org.apache.ignite.util.GridInternalTaskUnusedWalSegmentsTest;
import org.jetbrains.annotations.Nullable;
-import java.util.Set;
-
/**
* Basic test suite.
*/
@@ -52,7 +50,6 @@ public class IgniteBasicWithPersistenceTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite Basic With Persistence Test Suite");
suite.addTestSuite(IoomFailureHandlerTest.class);
- suite.addTestSuite(AccountTransferTransactionTest.class);
suite.addTestSuite(ClusterBaselineNodesMetricsSelfTest.class);
suite.addTestSuite(ServiceDeploymentOnActivationTest.class);
suite.addTestSuite(ServiceDeploymentOutsideBaselineTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index f9e6b81..1269d0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -26,13 +26,11 @@ import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDist
import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest;
import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
-import org.apache.ignite.internal.processors.cache.distributed.CacheClientsConcurrentStartTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMultithreadedTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientMultiNodeUpdateTopologyLockTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
@@ -45,11 +43,11 @@ import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPa
import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncNearCacheTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTopologyChangeTest;
-import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest;
import org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest;
import org.apache.ignite.testframework.junits.GridAbstractTest;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index 89e2ffc..2bd0861 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -17,6 +17,7 @@
package org.apache.ignite.testsuites;
+import java.util.Set;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.authentication.Authentication1kUsersNodeRestartTest;
import org.apache.ignite.internal.processors.authentication.AuthenticationConfigurationClusterTest;
@@ -38,11 +39,10 @@ import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridC
import org.apache.ignite.internal.processors.cache.eviction.paged.PageEvictionMultinodeMixedRegionsTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.CheckpointBufferDeadlockTest;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithPrimaryIndexCorruptionTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncWithPersistenceTest;
import org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest;
-import java.util.Set;
-
/**
* Test suite.
*/
@@ -95,6 +95,8 @@ public class IgniteCacheTestSuite7 extends TestSuite {
suite.addTestSuite(CacheRentingStateRepairTest.class);
+ suite.addTestSuite(TransactionIntegrityWithPrimaryIndexCorruptionTest.class);
+
return suite;
}
}