You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/28 20:06:52 UTC
[3/3] ignite git commit: ignite-1.5 Do not allow 'committing' ->
'marked_rollback' tx state change.
ignite-1.5 Do not allow 'committing' -> 'marked_rollback' tx state change.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/992793c2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/992793c2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/992793c2
Branch: refs/heads/ignite-1537
Commit: 992793c22da3a37544951b0d7c72cc3a7e9f2e29
Parents: f7386b8
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 28 22:06:28 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 28 22:06:28 2015 +0300
----------------------------------------------------------------------
.../IgniteCacheSyncUpdateNodeFailureTest.java | 330 +++++++++++++++++++
1 file changed, 330 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/992793c2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSyncUpdateNodeFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSyncUpdateNodeFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSyncUpdateNodeFailureTest.java
new file mode 100644
index 0000000..eb937c0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSyncUpdateNodeFailureTest.java
@@ -0,0 +1,330 @@
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.MutableEntry;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheSyncUpdateNodeFailureTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ //ccfg.setCacheStoreFactory(new TestStoreFactory());
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFullSyncOnePhaseCommit() throws Exception {
+ startGrids(3);
+
+ awaitPartitionMapExchange();
+
+ Ignite ignite = ignite(0);
+
+ final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ final Integer key = nearKey(cache);
+
+ final String backupNode = backupNode(key, null).name();
+
+ Ignite primary = primaryNode(key, null);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start update.");
+
+ cache.invoke(key, new TestEntryProcessor(Collections.singleton(backupNode)));
+
+ Integer val = cache.get(key);
+
+ log.info("End update, value: " + val);
+
+ return null;
+ }
+ }, "put-thread");
+
+ U.sleep(500);
+
+ primary.close();
+
+ fut.get();
+
+ U.sleep(5000);
+
+ log.info("Value at the end: " + cache.get(key));
+
+ cache.put(key, 2);
+
+ log.info("Value at the end: " + cache.get(key));
+// try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+// cache.put(key, 0);
+//
+// tx.commit();
+// }
+ }
+
+ /**
+ * TODO: test with one backup and store, with two backups.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFullSyncTxRecovery1() throws Exception {
+ startGrids(4);
+
+ awaitPartitionMapExchange();
+
+ Ignite ignite = ignite(0);
+
+ final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ final Integer key = nearKey(cache);
+
+ Ignite primary = primaryNode(key, null);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start update.");
+
+ Set<String> backups = new HashSet<>();
+
+ for (Ignite backup : backupNodes(key, null))
+ backups.add(backup.name());
+
+ cache.invoke(key, new TestEntryProcessor(backups));
+
+ log.info("End update, do get");
+
+ Integer val = cache.get(key);
+
+ log.info("End update, value: " + val);
+
+ return null;
+ }
+ }, "put-thread");
+
+ U.sleep(500);
+
+ primary.close();
+
+ U.sleep(5000);
+
+ U.dumpThreads(log);
+
+ fut.get();
+
+ U.sleep(5000);
+
+ log.info("Value at the end: " + cache.get(key));
+ }
+
+ /**
+ * TODO: test with one backup and store, with two backups.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFullSyncTxRecovery2() throws Exception {
+ startGrids(4);
+
+ awaitPartitionMapExchange();
+
+ final Ignite ignite = ignite(0);
+
+ final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ final Integer key = backupKey(cache);
+
+ Ignite primary = primaryNode(key, null);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start update.");
+
+ Set<String> backups = new HashSet<>();
+
+ for (Ignite backup : backupNodes(key, null))
+ backups.add(backup.name());
+
+ cache.invoke(key, new TestEntryProcessor(backups));
+
+ log.info("End update, do get");
+
+ Integer val = cache.get(key);
+
+ log.info("End update, value: " + val);
+
+ return null;
+ }
+ }, "put-thread");
+
+ U.sleep(500);
+
+ primary.close();
+
+ U.sleep(5000);
+
+ U.dumpThreads(log);
+
+ fut.get();
+
+ U.sleep(5000);
+
+ log.info("Value at the end: " + cache.get(key));
+ }
+
+ /**
+ * TODO: test with one backup and store, with two backups.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFullSyncTxRecovery3() throws Exception {
+ startGrids(3);
+
+ awaitPartitionMapExchange();
+
+ final Ignite ignite = ignite(0);
+
+ final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ final Integer key = nearKey(cache);
+
+ Ignite primary = primaryNode(key, null);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start update.");
+
+ cache.invoke(key, new TestEntryProcessor(Collections.singleton(backupNode(key, null).name())));
+
+ log.info("End update, do get");
+
+ Integer val = cache.get(key);
+
+ log.info("End update, value: " + val);
+
+ return null;
+ }
+ }, "put-thread");
+
+ U.sleep(500);
+
+ primary.close();
+
+ //U.sleep(5000);
+ //U.dumpThreads(log);
+
+ fut.get();
+
+ U.sleep(5000);
+
+ log.info("Value at the end: " + cache.get(key));
+ }
+
+ /**
+ *
+ */
+ static class TestEntryProcessor implements CacheEntryProcessor<Integer, Integer, Void> {
+ /** */
+ private Set<String> nodeNames;
+
+ /**
+ * @param nodeNames Node names where sleep will be called.
+ */
+ public TestEntryProcessor(Set<String> nodeNames) {
+ this.nodeNames = nodeNames;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<Integer, Integer> entry, Object... args) {
+ Ignite ignite = entry.unwrap(Ignite.class);
+
+ if (nodeNames.contains(ignite.name())) {
+ try {
+ System.out.println(Thread.currentThread().getName() + " sleep.");
+
+ Thread.sleep(10_000); // TODO use Latch
+
+ System.out.println(Thread.currentThread().getName() + " end sleep.");
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ entry.setValue(1);
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore> {
+ /** {@inheritDoc} */
+ @Override public CacheStore create() {
+ return new CacheStoreAdapter() {
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return null;
+ }
+
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ };
+ }
+ }
+}