You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/30 06:33:28 UTC

ignite git commit: ignite-6454 Disable interrupts for cache.get in pessimistic tx like for all others cache operations

Repository: ignite
Updated Branches:
  refs/heads/master bdb223996 -> d8e46de28


ignite-6454 Disable interrupts for cache.get in pessimistic tx like for all others cache operations


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8e46de2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8e46de2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8e46de2

Branch: refs/heads/master
Commit: d8e46de2899b35fa498310a31705e6a9760a84f3
Parents: bdb2239
Author: sboikov <sb...@apache.org>
Authored: Tue Oct 30 09:33:13 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Tue Oct 30 09:33:13 2018 +0300

----------------------------------------------------------------------
 .../cache/distributed/near/GridNearTxLocal.java |  12 +-
 .../internal/util/future/GridFutureAdapter.java |  14 +-
 ...eAbstractDataStructuresFailoverSelfTest.java |   4 +-
 ...rtitionedDataStructuresFailoverSelfTest.java |  14 --
 ...eplicatedDataStructuresFailoverSelfTest.java |  28 ---
 .../CacheOperationsInterruptTest.java           | 170 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite9.java       |   3 +
 7 files changed, 193 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f56d99b..d8e4353 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -2422,16 +2422,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     try {
                         IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
 
-                        return fut1.isDone() ?
+                        return nonInterruptable(fut1.isDone() ?
                             new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
-                            new GridEmbeddedFuture<>(finClos, fut1);
+                            new GridEmbeddedFuture<>(finClos, fut1));
                     }
                     catch (GridClosureException e) {
                         return new GridFinishedFuture<>(e.unwrap());
                     }
                     catch (IgniteCheckedException e) {
                         try {
-                            return plc2.apply(false, e);
+                            return nonInterruptable(plc2.apply(false, e));
                         }
                         catch (Exception e1) {
                             return new GridFinishedFuture<>(e1);
@@ -2439,10 +2439,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     }
                 }
                 else {
-                    return new GridEmbeddedFuture<>(
+                    return nonInterruptable(new GridEmbeddedFuture<>(
                         fut,
                         plc2,
-                        finClos);
+                        finClos));
                 }
             }
             else {
@@ -4739,7 +4739,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      * @param fut Future.
      * @return Future ignoring interrupts on {@code get()}.
      */
-    private <T> IgniteInternalFuture<T> nonInterruptable(IgniteInternalFuture<T> fut) {
+    private static <T> IgniteInternalFuture<T> nonInterruptable(IgniteInternalFuture<T> fut) {
         // Safety.
         if (fut instanceof GridFutureAdapter)
             ((GridFutureAdapter)fut).ignoreInterrupts();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 8302504..a191f30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -357,13 +357,23 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
 
     /** {@inheritDoc} */
     @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
-        return new ChainFuture<>(this, doneCb, null);
+        ChainFuture fut = new ChainFuture<>(this, doneCb, null);
+
+        if (ignoreInterrupts)
+            fut.ignoreInterrupts();
+
+        return fut;
     }
 
     /** {@inheritDoc} */
     @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb,
         Executor exec) {
-        return new ChainFuture<>(this, doneCb, exec);
+        ChainFuture fut = new ChainFuture<>(this, doneCb, exec);
+
+        if (ignoreInterrupts)
+            fut.ignoreInterrupts();
+
+        return fut;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 797e90f..6e3a7bb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -1320,7 +1320,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
         protected final AtomicBoolean failed = new AtomicBoolean(false);
 
         /** */
-        private final int topChangeThreads;
+        protected final int topChangeThreads;
 
         /** Flag to enable circular topology change. */
         private boolean circular;
@@ -1445,7 +1445,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
                             throw F.wrap(e);
                     }
                 }
-            }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+            }, topChangeThreads, "topology-change-thread");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
index eecfefe..ecb2df9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
@@ -38,18 +38,4 @@ public class GridCachePartitionedDataStructuresFailoverSelfTest
     @Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
         return TRANSACTIONAL;
     }
-
-    /**
-     *
-     */
-    @Override public void testReentrantLockConstantTopologyChangeNonFailoverSafe() {
-        fail("https://issues.apache.org/jira/browse/IGNITE-6454");
-    }
-
-    /**
-     *
-     */
-    @Override public void testFairReentrantLockConstantTopologyChangeNonFailoverSafe() {
-        fail("https://issues.apache.org/jira/browse/IGNITE-6454");
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
index 27fbdcf..b093d12 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
@@ -38,32 +38,4 @@ public class GridCacheReplicatedDataStructuresFailoverSelfTest
     @Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
         return TRANSACTIONAL;
     }
-
-    /**
-     *
-     */
-    @Override public void testFairReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() {
-        fail("https://issues.apache.org/jira/browse/IGNITE-6454");
-    }
-
-    /**
-     *
-     */
-    @Override public void testReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() {
-        fail("https://issues.apache.org/jira/browse/IGNITE-6454");
-    }
-
-    /**
-     *
-     */
-    @Override public void testReentrantLockConstantTopologyChangeNonFailoverSafe() {
-        fail("https://issues.apache.org/jira/browse/IGNITE-6454");
-    }
-
-    /**
-     *
-     */
-    @Override public void testFairReentrantLockConstantTopologyChangeNonFailoverSafe() {
-        fail("https://issues.apache.org/jira/browse/IGNITE-6454");
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheOperationsInterruptTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheOperationsInterruptTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheOperationsInterruptTest.java
new file mode 100644
index 0000000..971dbd1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheOperationsInterruptTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheOperationsInterruptTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setCacheMode(REPLICATED);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInterruptPessimisticTx() throws Exception {
+        final int NODES = 3;
+
+        startGrids(NODES);
+
+        awaitPartitionMapExchange();
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        Ignite node = ignite(0);
+
+        IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
+
+        final int KEYS = 100;
+
+        final boolean changeTop = true;
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            try {
+                IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                    @Override public void run() {
+                        Ignite node = ignite(0);
+
+                        IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
+
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        while (!stop.get()) {
+                            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                for (int i = 0; i < KEYS; i++) {
+                                    if (rnd.nextBoolean())
+                                        cache.get(i);
+                                }
+                            }
+                        }
+                    }
+                }, 3, "tx-thread");
+
+                IgniteInternalFuture<?> changeTopFut = null;
+
+                if (changeTop) {
+                    changeTopFut = GridTestUtils.runAsync(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            while (!stop.get()) {
+                                startGrid(NODES);
+
+                                stopGrid(NODES);
+                            }
+
+                            return null;
+                        }
+                    });
+                }
+
+                U.sleep(rnd.nextInt(500));
+
+                fut.cancel();
+
+                U.sleep(rnd.nextInt(500));
+
+                stop.set(true);
+
+                try {
+                    fut.get();
+                }
+                catch (Exception e) {
+                    info("Ignore error: " + e);
+                }
+
+                if (changeTopFut != null)
+                    changeTopFut.get();
+
+                info("Try get");
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    for (int k = 0; k < KEYS; k++)
+                        cache.get(k);
+                }
+
+                info("Try get done");
+
+                startGrid(NODES);
+                stopGrid(NODES);
+            }
+            finally {
+                stop.set(true);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 7dba461..173c555 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.CachePutIfAbsentTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheOperationsInterruptTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest;
@@ -52,6 +53,8 @@ public class IgniteCacheTestSuite9 extends TestSuite {
 
         suite.addTestSuite(TxDataConsistencyOnCommitFailureTest.class);
 
+        suite.addTestSuite(CacheOperationsInterruptTest.class);
+
         return suite;
     }
 }