You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/14 11:11:15 UTC
[06/28] ignite git commit: IGNITE-264 - Check backup node for
one-phase transaction when primary node crashes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
index 80fdbbe..ea7b124 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
@@ -131,7 +131,7 @@ public class IgniteInternalCacheTypesTest extends GridCommonAbstractTest {
checkCache(ignite, CU.MARSH_CACHE_NAME, MARSH_CACHE_POOL, false, false);
- checkCache(ignite, CU.ATOMICS_CACHE_NAME, SYSTEM_POOL, false, false);
+ checkCache(ignite, CU.ATOMICS_CACHE_NAME, SYSTEM_POOL, false, true);
for (String cache : userCaches)
checkCache(ignite, cache, SYSTEM_POOL, true, false);
@@ -157,4 +157,4 @@ public class IgniteInternalCacheTypesTest extends GridCommonAbstractTest {
assertEquals("Unexpected property for cache: " + cache.name(), user, cache.context().userCache());
assertEquals("Unexpected property for cache: " + cache.name(), sysTx, cache.context().systemTx());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java
new file mode 100644
index 0000000..87c160f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearSelfTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Checks one-phase commit scenarios.
+ */
+public class IgniteOnePhaseCommitNearSelfTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 4;
+
+ /** */
+ private int backups = 1;
+
+ /** */
+ private static Map<Class<?>, AtomicInteger> msgCntMap = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+ cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+
+ cfg.setCommunicationSpi(new MessageCountingCommunicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration cacheConfiguration(String gridName) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setBackups(backups);
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+ ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOnePhaseCommitFromNearNode() throws Exception {
+ backups = 1;
+
+ startGrids(GRID_CNT);
+
+ try {
+ awaitPartitionMapExchange();
+
+ int key = generateNearKey();
+
+ IgniteCache<Object, Object> cache = ignite(0).cache(null);
+
+ checkKey(ignite(0).transactions(), cache, key);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param transactions Transactions instance.
+ * @param cache Cache instance.
+ * @param key Key.
+ */
+ private void checkKey(IgniteTransactions transactions, Cache<Object, Object> cache, int key) throws Exception {
+ cache.put(key, key);
+
+ finalCheck(key, true);
+
+ TransactionIsolation[] isolations = {READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE};
+ TransactionConcurrency[] concurrencies = {OPTIMISTIC, PESSIMISTIC};
+
+ for (TransactionIsolation isolation : isolations) {
+ for (TransactionConcurrency concurrency : concurrencies) {
+ info("Checking transaction [isolation=" + isolation + ", concurrency=" + concurrency + ']');
+
+ try (Transaction tx = transactions.txStart(concurrency, isolation)) {
+ cache.put(key, isolation + "-" + concurrency);
+
+ tx.commit();
+ }
+
+ finalCheck(key, true);
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void finalCheck(final int key, boolean onePhase) throws Exception {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ for (int i = 0; i < GRID_CNT; i++) {
+ GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite(i)).internalCache();
+
+ GridCacheEntryEx entry = cache.peekEx(key);
+
+ if (entry != null) {
+ if (entry.lockedByAny()) {
+ info("Near entry is still locked [i=" + i + ", entry=" + entry + ']');
+
+ return false;
+ }
+ }
+
+ entry = cache.context().near().dht().peekEx(key);
+
+ if (entry != null) {
+ if (entry.lockedByAny()) {
+ info("DHT entry is still locked [i=" + i + ", entry=" + entry + ']');
+
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ info("Entry was removed, will retry");
+
+ return false;
+ }
+ }
+ }, 10_000);
+
+ if (onePhase) {
+ assertMessageCount(GridNearTxPrepareRequest.class, 1);
+ assertMessageCount(GridDhtTxPrepareRequest.class, 1);
+ assertMessageCount(GridNearTxFinishRequest.class, 1);
+ assertMessageCount(GridDhtTxFinishRequest.class, 0);
+
+ msgCntMap.clear();
+ }
+ }
+
+ /**
+ * @param cls Class to check.
+ * @param cnt Expected count.
+ */
+ private void assertMessageCount(Class<?> cls, int cnt) {
+ AtomicInteger val = msgCntMap.get(cls);
+
+ int iVal = val == null ? 0 : val.get();
+
+ assertEquals("Invalid message count for class: " + cls.getSimpleName(), cnt, iVal);
+ }
+
+ /**
+ * @return Key.
+ */
+ protected int generateNearKey() {
+ Affinity<Object> aff = ignite(0).affinity(null);
+
+ int key = 0;
+
+ while (true) {
+ boolean primary = aff.isPrimary(ignite(1).cluster().localNode(), key);
+ boolean primaryOrBackup = aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), key);
+
+ if (primary && !primaryOrBackup)
+ return key;
+
+ key++;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MessageCountingCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage ioMsg = (GridIoMessage)msg;
+
+ Class<?> cls = ioMsg.message().getClass();
+
+ AtomicInteger cntr = msgCntMap.get(cls);
+
+ if (cntr == null)
+ cntr = F.addIfAbsent(msgCntMap, cls, new AtomicInteger());
+
+ cntr.incrementAndGet();
+ }
+
+ super.sendMessage(node, msg, ackClosure);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
index e712cc9..5bc779c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -98,7 +99,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
startGrid(1);
startGrid(2);
- PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
+ final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
ignite(2).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
@@ -127,7 +128,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
assert !cache.containsKey(key);
- assert !lsnr.lostParts.isEmpty();
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return !lsnr.lostParts.isEmpty();
+ }
+ }, getTestTimeout());
}
/**
@@ -139,7 +144,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
awaitPartitionMapExchange();
- PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
+ final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
ignite(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
@@ -157,7 +162,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
assert !jcache(1).containsKey(key);
- assert !lsnr.lostParts.isEmpty();
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return !lsnr.lostParts.isEmpty();
+ }
+ }, getTestTimeout());
}
/**
@@ -172,7 +181,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
startGrid(0);
- PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
+ final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
grid(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
@@ -206,7 +215,7 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
startGrid(1);
- PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
+ final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
grid(1).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
@@ -235,7 +244,11 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
awaitPartitionMapExchange();
- assert !lsnr.lostParts.isEmpty();
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return !lsnr.lostParts.isEmpty();
+ }
+ }, getTestTimeout());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
index 27b201f..806d8b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
@@ -182,7 +182,8 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
startGrids(GRID_CNT);
- awaitPartitionMapExchange();
+ if (cacheMode == REPLICATED)
+ awaitPartitionMapExchange();
ignites = new Ignite[GRID_CNT];
ids = new UUID[GRID_CNT];
@@ -624,4 +625,4 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
return null;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
new file mode 100644
index 0000000..1135c40
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import javax.cache.CacheException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests one-phase commit transactions when some of the nodes fail in the middle of the transaction.
+ */
+@SuppressWarnings("unchecked")
+public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
+ /**
+ * @return Grid count.
+ */
+ public int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+ cfg.setCommunicationSpi(new BanningCommunicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration cacheConfiguration(String gridName) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg.setBackups(1);
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupCommitPessimistic() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupCommitOptimistic() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupCommitPessimisticOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupCommitOptimisticOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackPessimistic() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackOptimistic() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackPessimisticOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackOptimisticOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupCommitImplicit() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(null, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupCommitImplicitOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(null, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackImplicit() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(null, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureBackupRollbackImplicitOnBackup() throws Exception {
+ checkPrimaryNodeFailureBackupCommit(null, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkPrimaryNodeFailureBackupCommit(
+ final TransactionConcurrency conc,
+ boolean backup,
+ final boolean commit
+ ) throws Exception {
+ startGrids(gridCount());
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < gridCount(); i++)
+ info("Grid " + i + ": " + ignite(i).cluster().localNode().id());
+
+ try {
+ final Ignite ignite = ignite(0);
+
+ final IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ final int key = generateKey(ignite, backup);
+
+ IgniteEx backupNode = (IgniteEx)backupNode(key, null);
+
+ assertNotNull(backupNode);
+
+ final CountDownLatch commitLatch = new CountDownLatch(1);
+
+ if (!commit) {
+ communication(1).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareRequest.class));
+ }
+ else {
+ if (!backup) {
+ communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+ communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+ }
+ else
+ communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+ }
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ if (conc != null) {
+ try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ)) {
+ cache.put(key, key);
+
+ Transaction asyncTx = (Transaction)tx.withAsync();
+
+ asyncTx.commit();
+
+ commitLatch.countDown();
+
+ try {
+ IgniteFuture<Object> fut = asyncTx.future();
+
+ fut.get();
+
+ if (!commit) {
+ error("Transaction has been committed");
+
+ fail("Transaction has been committed: " + tx);
+ }
+ }
+ catch (TransactionRollbackException e) {
+ if (commit) {
+ error(e.getMessage(), e);
+
+ fail("Failed to commit: " + e);
+ }
+ else
+ assertTrue(X.hasCause(e, TransactionRollbackException.class));
+ }
+ }
+ }
+ else {
+ IgniteCache<Object, Object> cache0 = cache.withAsync();
+
+ cache0.put(key, key);
+
+ Thread.sleep(1000);
+
+ commitLatch.countDown();
+
+ try {
+ cache0.future().get();
+
+ if (!commit) {
+ error("Transaction has been committed");
+
+ fail("Transaction has been committed.");
+ }
+ }
+ catch (CacheException e) {
+ if (commit) {
+ error(e.getMessage(), e);
+
+ fail("Failed to commit: " + e);
+ }
+ else
+ assertTrue(X.hasCause(e, TransactionRollbackException.class));
+ }
+ }
+
+ return null;
+ }
+ });
+
+ commitLatch.await();
+
+ stopGrid(1);
+
+ // Check that thread successfully finished.
+ fut.get();
+
+ // Check there are no hanging transactions.
+ assertEquals(0, ((IgniteEx)ignite(0)).context().cache().context().tm().idMapSize());
+ assertEquals(0, ((IgniteEx)ignite(2)).context().cache().context().tm().idMapSize());
+ assertEquals(0, ((IgniteEx)ignite(3)).context().cache().context().tm().idMapSize());
+
+ dataCheck((IgniteKernal)ignite(0), (IgniteKernal)backupNode, key, commit);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param orig Originating cache.
+ * @param backup Backup cache.
+ * @param key Key being committed and checked.
+ * @param commit Commit or rollback flag.
+ * @throws Exception If check failed.
+ */
+ private void dataCheck(IgniteKernal orig, IgniteKernal backup, int key, boolean commit) throws Exception {
+ GridNearCacheEntry nearEntry = null;
+
+ GridCacheAdapter origCache = orig.internalCache(null);
+
+ if (origCache.isNear())
+ nearEntry = (GridNearCacheEntry)origCache.peekEx(key);
+
+ GridCacheAdapter backupCache = backup.internalCache(null);
+
+ if (backupCache.isNear())
+ backupCache = backupCache.context().near().dht();
+
+ GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)backupCache.peekEx(key);
+
+ if (commit) {
+ assertNotNull(dhtEntry);
+ assertTrue("dhtEntry=" + dhtEntry, dhtEntry.remoteMvccSnapshot().isEmpty());
+ assertTrue("dhtEntry=" + dhtEntry, dhtEntry.localCandidates().isEmpty());
+ assertEquals(key, backupCache.localPeek(key, null, null));
+
+ if (nearEntry != null) {
+ assertTrue("near=" + nearEntry, nearEntry.remoteMvccSnapshot().isEmpty());
+ assertTrue("near=" + nearEntry, nearEntry.localCandidates().isEmpty());
+
+ // Near peek wil be null since primary node has changed.
+ assertNull("near=" + nearEntry, origCache.localPeek(key, null, null));
+ }
+ }
+ else {
+ assertTrue("near=" + nearEntry + ", hc=" + System.identityHashCode(nearEntry), nearEntry == null);
+ assertTrue("Invalid backup cache entry: " + dhtEntry,
+ dhtEntry == null || dhtEntry.rawGetOrUnmarshal(false) == null);
+ }
+ }
+
+ /**
+ * @param idx Index.
+ * @return Communication SPI.
+ */
+ private BanningCommunicationSpi communication(int idx) {
+ return (BanningCommunicationSpi)ignite(idx).configuration().getCommunicationSpi();
+ }
+
+ /**
+ * @param ignite Ignite instance to generate key.
+ * @return Generated key that is not primary nor backup for {@code ignite(0)} and primary for
+ * {@code ignite(1)}.
+ */
+ private int generateKey(Ignite ignite, boolean backup) {
+ Affinity<Object> aff = ignite.affinity(null);
+
+ for (int key = 0;;key++) {
+ if (backup) {
+ if (!aff.isBackup(ignite(0).cluster().localNode(), key))
+ continue;
+ }
+ else {
+ if (aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), key))
+ continue;
+ }
+
+ if (aff.isPrimary(ignite(1).cluster().localNode(), key))
+ return key;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class BanningCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private volatile Collection<Class> bannedClasses = Collections.emptyList();
+
+ /**
+ * @param bannedClasses Banned classes.
+ */
+ public void bannedClasses(Collection<Class> bannedClasses) {
+ this.bannedClasses = bannedClasses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+ GridIoMessage ioMsg = (GridIoMessage)msg;
+
+ if (!bannedClasses.contains(ioMsg.message().getClass())) {
+ super.sendMessage(node, msg, ackClosure);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java
new file mode 100644
index 0000000..5735182
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNearCacheTxNodeFailureSelfTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+
+/**
+ *
+ */
+public class GridNearCacheTxNodeFailureSelfTest extends GridCacheTxNodeFailureSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) {
+ return super.cacheConfiguration(gridName).setNearConfiguration(new NearCacheConfiguration());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
new file mode 100644
index 0000000..cee54b8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstractTest {
+ /**
+ * Grid count.
+ */
+ private static final int GRID_CNT = 5;
+
+ /**
+ * Restart count.
+ */
+ private static final int RESTART_CNT = 15;
+
+ /**
+ * Atomic long name.
+ */
+ private static final String ATOMIC_LONG_NAME = "test-atomic-long";
+
+ /**
+ * Queue.
+ */
+ private final Queue<Long> queue = new ConcurrentLinkedQueue<>();
+
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ AtomicConfiguration atomicCfg = new AtomicConfiguration();
+ atomicCfg.setCacheMode(CacheMode.PARTITIONED);
+ atomicCfg.setBackups(1);
+
+ cfg.setAtomicConfiguration(atomicCfg);
+
+ return cfg;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ queue.clear();
+ }
+
+ /**
+ *
+ */
+ public void testQueueCreateNodesJoin() throws Exception {
+ CountDownLatch startLatch = new CountDownLatch(GRID_CNT);
+ final AtomicBoolean run = new AtomicBoolean(true);
+
+ Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < GRID_CNT; i++)
+ futs.add(startNodeAndCreaterThread(i, startLatch, run));
+
+ startLatch.await();
+
+ info("All nodes started.");
+
+ Thread.sleep(10_000);
+
+ run.set(false);
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
+
+ info("Increments: " + queue.size());
+
+ assert !queue.isEmpty();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIncrementConsistency() throws Exception {
+ startGrids(GRID_CNT);
+
+ final AtomicBoolean run = new AtomicBoolean(true);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception {
+ IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME, 0, true);
+
+ while (run.get())
+ queue.add(cntr.getAndIncrement());
+
+ return null;
+ }
+ }, 4, "increment-runner");
+
+ for (int i = 0; i < RESTART_CNT; i++) {
+ int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1;
+
+ stopGrid(restartIdx);
+
+ U.sleep(500);
+
+ startGrid(restartIdx);
+ }
+
+ run.set(false);
+
+ fut.get();
+
+ info("Increments: " + queue.size());
+
+ checkQueue();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueClose() throws Exception {
+ startGrids(GRID_CNT);
+
+ int threads = 4;
+
+ final AtomicBoolean run = new AtomicBoolean(true);
+ final AtomicInteger idx = new AtomicInteger();
+ final AtomicReferenceArray<Exception> arr = new AtomicReferenceArray<>(threads);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ /** {@inheritDoc} */
+ @Override
+ public Void call() throws Exception {
+ int base = idx.getAndIncrement();
+
+ try {
+ int delta = 0;
+
+ while (run.get()) {
+ IgniteAtomicLong cntr = ignite(0).atomicLong(ATOMIC_LONG_NAME + "-" + base + "-" + delta, 0, true);
+
+ for (int i = 0; i < 5; i++)
+ queue.add(cntr.getAndIncrement());
+
+ cntr.close();
+
+ delta++;
+ }
+ }
+ catch (Exception e) {
+ arr.set(base, e);
+
+ throw e;
+ }
+ finally {
+ info("RUNNER THREAD IS STOPPING");
+ }
+
+ return null;
+ }
+ }, threads, "increment-runner");
+
+ for (int i = 0; i < RESTART_CNT; i++) {
+ int restartIdx = ThreadLocalRandom.current().nextInt(GRID_CNT - 1) + 1;
+
+ stopGrid(restartIdx);
+
+ U.sleep(500);
+
+ startGrid(restartIdx);
+ }
+
+ run.set(false);
+
+ fut.get();
+
+ for (int i = 0; i < threads; i++) {
+ Exception err = arr.get(i);
+
+ if (err != null)
+ throw err;
+ }
+ }
+
+ /**
+ *
+ */
+ private void checkQueue() {
+ List<Long> list = new ArrayList<>(queue);
+
+ Collections.sort(list);
+
+ boolean failed = false;
+
+ int delta = 0;
+
+ for (int i = 0; i < list.size(); i++) {
+ Long exp = (long)(i + delta);
+
+ Long actual = list.get(i);
+
+ if (!exp.equals(actual)) {
+ failed = true;
+
+ delta++;
+
+ info(">>> Expected " + exp + ", actual " + actual);
+ }
+ }
+
+ assertFalse(failed);
+ }
+
+ /**
+ * @param i Node index.
+ */
+ private IgniteInternalFuture<?> startNodeAndCreaterThread(final int i, final CountDownLatch startLatch, final AtomicBoolean run)
+ throws Exception {
+ return multithreadedAsync(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Ignite ignite = startGrid(i);
+
+ startLatch.countDown();
+
+ while (run.get()) {
+ IgniteAtomicLong cntr = ignite.atomicLong(ATOMIC_LONG_NAME, 0, true);
+
+ queue.add(cntr.getAndIncrement());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, 1, "grunner-" + i);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
index db55731..1d80ac1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearOnlyTxTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
@@ -90,11 +92,14 @@ public class IgniteCacheNearOnlyTxTest extends IgniteCacheAbstractTest {
IgniteCache<Integer, Integer> cache0 = ignite(0).cache(null);
IgniteCache<Integer, Integer> cache1 = ignite1.cache(null);
+ Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
for (int i = 0; i < 5; i++) {
log.info("Iteration: " + i);
- GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
+ futs.add(GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
int val = idx.getAndIncrement();
IgniteCache<Integer, Integer> cache = ignite1.cache(null);
@@ -104,10 +109,13 @@ public class IgniteCacheNearOnlyTxTest extends IgniteCacheAbstractTest {
return null;
}
- }, 5, "put-thread");
+ }, 5, "put-thread"));
assertEquals(cache0.localPeek(key), cache1.localPeek(key));
}
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 28f9a65..eaeb7b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -30,6 +30,9 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxNode
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtAtomicRemoveFailureTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtClientRemoveFailureTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtRemoveFailureTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheTxNodeFailureSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridNearCacheTxNodeFailureSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteAtomicLongChangingTopologySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryTransactionalSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientInvalidPartitionHandlingSelfTest;
@@ -97,7 +100,11 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheSizeFailoverTest.class);
suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class);
+ suite.addTestSuite(IgniteAtomicLongChangingTopologySelfTest.class);
+
+ suite.addTestSuite(GridCacheTxNodeFailureSelfTest.class);
+ suite.addTestSuite(GridNearCacheTxNodeFailureSelfTest.class);
return suite;
}
-}
\ No newline at end of file
+}