You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/04/21 07:41:54 UTC

[ignite] branch ignite-2.8.1 updated: IGNITE-12734 Fixed scan query over evicted partition - Fixes #7494.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch ignite-2.8.1
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.8.1 by this push:
     new faf7f06  IGNITE-12734 Fixed scan query over evicted partition - Fixes #7494.
faf7f06 is described below

commit faf7f06e33b05c029738a992482ff85e0e5ab1f2
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Wed Mar 18 13:12:47 2020 +0300

    IGNITE-12734 Fixed scan query over evicted partition - Fixes #7494.
    
    (cherry picked from commit bcaae8deef07344c7615f4ba76c63a3e5e866d6b)
---
 .../cache/IgniteCacheOffheapManager.java           |   5 +
 .../cache/IgniteCacheOffheapManagerImpl.java       |  61 ++++++----
 .../cache/persistence/GridCacheOffheapManager.java |  10 ++
 .../cache/persistence/tree/BPlusTree.java          |   8 +-
 .../processors/cache/tree/CacheDataTree.java       |   2 +
 .../cache/query/CacheScanQueryFailoverTest.java    | 133 ++++++++++++++++++++-
 6 files changed, 190 insertions(+), 29 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 47d2b7b..8c0f43a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1040,6 +1040,11 @@ public interface IgniteCacheOffheapManager {
         public void destroy() throws IgniteCheckedException;
 
         /**
+         * Mark store as destroyed.
+         */
+        public void markDestroyed() throws IgniteCheckedException;
+
+        /**
          * Clears all the records associated with logical cache with given ID.
          *
          * @param cacheId Cache ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index d208578..c8abe69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -926,40 +926,46 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     return true;
 
                 while (true) {
-                    if (cur == null) {
-                        if (dataIt.hasNext()) {
-                            CacheDataStore ds = dataIt.next();
+                    try {
+                        if (cur == null) {
+                            if (dataIt.hasNext()) {
+                                CacheDataStore ds = dataIt.next();
 
-                            curPart = ds.partId();
+                                curPart = ds.partId();
 
-                            // Data page scan is disabled by default for scan queries.
-                            // TODO https://issues.apache.org/jira/browse/IGNITE-11998
-                            CacheDataTree.setDataPageScanEnabled(false);
+                                // Data page scan is disabled by default for scan queries.
+                                // TODO https://issues.apache.org/jira/browse/IGNITE-11998
+                                CacheDataTree.setDataPageScanEnabled(false);
 
-                            try {
-                                if (mvccSnapshot == null)
-                                    cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
-                                else {
-                                    cur = cacheId == CU.UNDEFINED_CACHE_ID ?
-                                        ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot);
+                                try {
+                                    if (mvccSnapshot == null)
+                                        cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
+                                    else {
+                                        cur = cacheId == CU.UNDEFINED_CACHE_ID ?
+                                            ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot);
+                                    }
+                                }
+                                finally {
+                                    CacheDataTree.setDataPageScanEnabled(false);
                                 }
                             }
-                            finally {
-                                CacheDataTree.setDataPageScanEnabled(false);
-                            }
+                            else
+                                break;
                         }
-                        else
-                            break;
-                    }
 
-                    if (cur.next()) {
-                        next = cur.get();
-                        next.key().partition(curPart);
+                        if (cur.next()) {
+                            next = cur.get();
+                            next.key().partition(curPart);
 
-                        break;
+                            break;
+                        }
+                        else
+                            cur = null;
+                    }
+                    catch (IgniteCheckedException ex) {
+                        throw new IgniteCheckedException("Failed to get next data row due to underlying cursor " +
+                            "invalidation", ex);
                     }
-                    else
-                        cur = null;
                 }
 
                 return next != null;
@@ -2946,6 +2952,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
+        @Override public void markDestroyed() {
+            dataTree.markDestroyed();
+        }
+
+        /** {@inheritDoc} */
         @Override public void clear(int cacheId) throws IgniteCheckedException {
             assert cacheId != CU.UNDEFINED_CACHE_ID;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index cd43d88..a255413 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -852,6 +852,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             ctx.database().checkpointReadUnlock();
         }
 
+        store.markDestroyed();
+
         ((GridCacheDatabaseSharedManager)ctx.database()).schedulePartitionDestroy(grp.groupId(), partId);
     }
 
@@ -2575,6 +2577,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
+        @Override public void markDestroyed() throws IgniteCheckedException {
+            CacheDataStore delegate = init0(true);
+
+            if (delegate != null)
+                delegate.markDestroyed();
+        }
+
+        /** {@inheritDoc} */
         @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
             CacheDataStore delegate = init0(true);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index fc14a5b..833ec04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -1006,9 +1006,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     /**
      * Check if the tree is getting destroyed.
      */
-    protected final void checkDestroyed() {
+    protected final void checkDestroyed() throws IgniteCheckedException {
         if (destroyed.get())
-            throw new IllegalStateException(CONC_DESTROY_MSG + getName());
+            throw new IgniteCheckedException(CONC_DESTROY_MSG + getName());
     }
 
     /** {@inheritDoc} */
@@ -2487,7 +2487,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     /**
      * @return {@code True} if state was changed.
      */
-    private boolean markDestroyed() {
+    public boolean markDestroyed() {
         return destroyed.compareAndSet(false, true);
     }
 
@@ -5404,6 +5404,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @throws IgniteCheckedException If failed.
          */
         final boolean nextPage(L lastRow) throws IgniteCheckedException {
+            checkDestroyed();
+
             updateLowerBound(lastRow);
 
             for (;;) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index a8f9c51..a617621 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -217,6 +217,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
              * @throws IgniteCheckedException If failed.
              */
             private boolean readNextDataPage() throws IgniteCheckedException {
+                checkDestroyed();
+
                 for (;;) {
                     if (++curPage >= pagesCnt) {
                         // Reread number of pages when we reach it (it may grow).
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java
index 488ce18..8a0ebb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheScanQueryFailoverTest.java
@@ -17,17 +17,30 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.NodeOrderComparator;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -36,6 +49,7 @@ import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
 
 /**
  * ScanQuery failover test. Tests scenario where user supplied closures throw unhandled errors.
@@ -65,7 +79,8 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected boolean isRemoteJvm(String igniteInstanceName) {
-        if(igniteInstanceName.equals("client") || igniteInstanceName.equals("server"))
+        if(igniteInstanceName.equals("client") || igniteInstanceName.equals("server")
+            || igniteInstanceName.startsWith("grid"))
             return false;
         else
             return super.isRemoteJvm(igniteInstanceName);
@@ -78,6 +93,8 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest {
         if (name.equals("client"))
             cfg.setClientMode(true);
 
+        cfg.setConsistentId(name);
+
         cfg.setFailureHandler(new StopNodeOrHaltFailureHandler());
 
         return cfg;
@@ -115,6 +132,73 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Test scan query when partitions are concurrently evicting.
+     */
+    @Test
+    public void testScanQueryOnEvictedPartition() throws Exception {
+        cleanPersistenceDir();
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+            new DataRegionConfiguration().setPersistenceEnabled(true));
+
+        IgniteEx grid0 = startGrid(getConfiguration("grid0").setDataStorageConfiguration(dsCfg));
+
+        grid0.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache1 = grid0.getOrCreateCache(
+            new CacheConfiguration<Integer, Integer>("cache1")
+                .setAffinity(new RoundRobinAffinityFunction(2))
+        );
+
+        IgniteCache<Integer, Integer> cache2 = grid0.getOrCreateCache(
+            new CacheConfiguration<Integer, Integer>("cache2")
+                .setAffinity(new RoundRobinAffinityFunction(2))
+        );
+
+        cache1.put(0, 0); // Put to partition 0.
+        cache1.put(1, 1); // Put to partition 1.
+
+        cache2.put(0, 0); // Put to partition 0.
+
+        for (int i = 1; i < 1_000; i += 2)
+            cache2.put(i, i); // Put to partition 1.
+
+        Iterator iter1 = cache1.query(new ScanQuery<>().setPageSize(1)).iterator();
+        Iterator iter2 = cache1.query(new ScanQuery<>().setPageSize(1)).iterator();
+
+        // Iter 1 check case, when cursor is switched to evicted partition.
+        iter1.next();
+
+        // Iter 2 check case, when cursor already moving by partition and this partition is evicted.
+        iter2.next();
+        iter2.next();
+
+        startGrid(getConfiguration("grid1").setDataStorageConfiguration(dsCfg));
+
+        grid0.cluster().setBaselineTopology(grid0.cluster().topologyVersion());
+
+        // Wait for rebalance and evition of partition 1 to grid 1 for each cache.
+        awaitPartitionMapExchange();
+
+        assertTrue(GridTestUtils.waitForCondition(() ->
+            grid0.cachex("cache1").context().topology().localPartition(1).state() == EVICTED &&
+                grid0.cachex("cache2").context().topology().localPartition(1).state() == EVICTED,
+            1_000L));
+
+        // Force checkpoint to destroy evicted partitions store.
+        forceCheckpoint(grid0);
+
+        GridTestUtils.assertThrowsAnyCause(log, iter1::next, IgniteException.class, "Failed to get next data row");
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> {
+            while (iter2.hasNext())
+                iter2.next();
+
+            return null;
+        }, IgniteException.class, "Failed to get next data row");
+    }
+
+    /**
      * @param ignite Ignite instance.
      * @param configs Cache configurations.
      */
@@ -174,4 +258,51 @@ public class CacheScanQueryFailoverTest extends GridCommonAbstractTest {
             integerBinaryObjectEntry -> {
                 throw new Error("Poison pill");
         };
+
+    /**
+     * Affinity function to distribute partitions by round robin to each node.
+     */
+    private static class RoundRobinAffinityFunction implements AffinityFunction {
+        /** Partitions count. */
+        private final int partitions;
+
+        /**
+         * @param partitions Partitions count.
+         */
+        public RoundRobinAffinityFunction(int partitions) {
+            this.partitions = partitions;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return partitions;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            return key.hashCode() % partitions;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+            List<List<ClusterNode>> res = new ArrayList<>(partitions);
+            List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+            nodes.sort(NodeOrderComparator.getInstance());
+
+            for (int i = 0; i < partitions; i++)
+                res.add(Collections.singletonList(nodes.get(i % nodes.size())));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            // No-op.
+        }
+    }
 }