You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ip...@apache.org on 2019/12/27 12:58:26 UTC

[ignite] branch ignite-2.8 updated: IGNITE-12482 Fix query mapping on nodes for REPLICATED caches when there are MOVING partitions - Fixes #7182.

This is an automated email from the ASF dual-hosted git repository.

ipavlukhin pushed a commit to branch ignite-2.8
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.8 by this push:
     new abbda34  IGNITE-12482 Fix query mapping on nodes for REPLICATED caches when there are MOVING partitions - Fixes #7182.
abbda34 is described below

commit abbda3498f3b913ac18ca0928b39ea6f213487b8
Author: pavlukhin <vo...@gmail.com>
AuthorDate: Fri Dec 27 15:50:51 2019 +0300

    IGNITE-12482 Fix query mapping on nodes for REPLICATED caches when there are MOVING partitions - Fixes #7182.
    
    Signed-off-by: ipavlukhin <vo...@gmail.com>
    (cherry picked from commit ea92286323e5c680258e24eeb840af3c94b7b43e)
---
 .../query/h2/sql/GridSqlQueryParser.java           |  16 +-
 .../query/SqlQueriesTopologyMappingTest.java       | 185 +++++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite2.java          |   5 +-
 3 files changed, 200 insertions(+), 6 deletions(-)

diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 0717198..573df3c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -1811,19 +1811,19 @@ public class GridSqlQueryParser {
                 GridH2Table tbl = ((GridSqlTable)o).dataTable();
 
                 if (tbl != null) {
+                    GridCacheContext<?, ?> cctx = tbl.cacheContext();
+
                     //It's not affinity cache. Can't be local.
-                    if (tbl.cacheContext() == null)
+                    if (cctx == null)
                         return false;
 
-                    GridCacheContext cctx = tbl.cacheContext();
-
                     if (cctx.mvccEnabled())
                         return false;
 
                     if (cctx.isPartitioned())
                         return false;
 
-                    if (cctx.isReplicated() && !cctx.isReplicatedAffinityNode())
+                    if (isReplicatedLocalExecutionImpossible(cctx))
                         return false;
                 }
             }
@@ -1832,6 +1832,13 @@ public class GridSqlQueryParser {
         return true;
     }
 
+    /** */
+    private static boolean isReplicatedLocalExecutionImpossible(GridCacheContext<?, ?> cctx) {
+        // Improvement is possible:
+        // MOVING partitions check inspects full partition map, but possibly only local node check is sufficient.
+        return cctx.isReplicated() && (!cctx.affinityNode() || cctx.topology().hasMovingPartitions());
+    }
+
     /**
      * Get first (i.e. random, as we need any one) partitioned cache from parsed query
      *     to determine expected query parallelism.
@@ -2460,7 +2467,6 @@ public class GridSqlQueryParser {
     /**
      * Field getter.
      */
-    @SuppressWarnings("unchecked")
     public static class Getter<T, R> {
         /** */
         private final Field fld;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueriesTopologyMappingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueriesTopologyMappingTest.java
new file mode 100644
index 0000000..2b93e59
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueriesTopologyMappingTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/** */
+public class SqlQueriesTopologyMappingTest extends AbstractIndexingCommonTest {
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setClientMode(client);
+    }
+
+    /** */
+    @Test
+    public void testPartitionedQueryWithRebalance() throws Exception {
+        checkQueryWithRebalance(CacheMode.PARTITIONED);
+    }
+
+    /** */
+    @Test
+    public void testReplicatedQueryWithRebalance() throws Exception {
+        checkQueryWithRebalance(CacheMode.REPLICATED);
+    }
+
+    /** */
+    @Test
+    public void testPartitionedQueryWithNodeFilter() throws Exception {
+        checkQueryWithNodeFilter(CacheMode.PARTITIONED);
+    }
+
+    /** */
+    @Test
+    public void testReplicatedQueryWithNodeFilter() throws Exception {
+        checkQueryWithNodeFilter(CacheMode.REPLICATED);
+    }
+
+    /** */
+    @Test
+    public void testLocalCacheQueryMapping() throws Exception {
+        IgniteEx ign0 = startGrid(0);
+
+        IgniteCache<Object, Object> cache = ign0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setCacheMode(CacheMode.LOCAL)
+            .setSqlSchema("PUBLIC")
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        cache.put(1, 2);
+
+        startGrid(1);
+
+        SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer");
+
+        {
+            List<List<?>> res0 = grid(0).cache(DEFAULT_CACHE_NAME).query(qry).getAll();
+
+            assertEquals(1, res0.size());
+            assertEqualsCollections(Arrays.asList(1, 2), res0.get(0));
+        }
+
+        {
+            List<List<?>> res1 = grid(1).cache(DEFAULT_CACHE_NAME).query(qry).getAll();
+
+            assertTrue(res1.isEmpty());
+        }
+    }
+
+    /** */
+    private void checkQueryWithRebalance(CacheMode cacheMode) throws Exception {
+        IgniteEx ign0 = startGrid(0);
+
+        IgniteCache<Object, Object> cache = ign0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setCacheMode(cacheMode)
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        cache.put(1, 2);
+
+        blockRebalanceSupplyMessages(ign0, DEFAULT_CACHE_NAME, getTestIgniteInstanceName(1));
+
+        startGrid(1);
+
+        client = true;
+
+        startGrid(10);
+
+        for (Ignite ign : G.allGrids()) {
+            List<List<?>> res = ign.cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("select * from Integer")).getAll();
+
+            assertEquals(1, res.size());
+            assertEqualsCollections(Arrays.asList(1, 2), res.get(0));
+        }
+    }
+
+    /** */
+    private void checkQueryWithNodeFilter(CacheMode cacheMode) throws Exception {
+        IgniteEx ign0 = startGrid(0);
+        String name0 = ign0.name();
+
+        IgniteCache<Object, Object> cache = ign0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setCacheMode(cacheMode)
+            .setNodeFilter(node -> name0.equals(node.attribute(ATTR_IGNITE_INSTANCE_NAME)))
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        cache.put(1, 2);
+
+        startGrid(1);
+
+        client = true;
+
+        startGrid(10);
+
+        for (Ignite ign : G.allGrids()) {
+            List<List<?>> res = ign.cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("select * from Integer")).getAll();
+
+            assertEquals(1, res.size());
+            assertEqualsCollections(Arrays.asList(1, 2), res.get(0));
+        }
+    }
+
+    /** */
+    private void blockRebalanceSupplyMessages(IgniteEx sndNode, String cacheName, String dstNodeName) {
+        int grpId = sndNode.cachex(cacheName).context().groupId();
+
+        TestRecordingCommunicationSpi comm0 = (TestRecordingCommunicationSpi)sndNode.configuration().getCommunicationSpi();
+        comm0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                String dstName = node.attribute(ATTR_IGNITE_INSTANCE_NAME);
+
+                if (dstNodeName.equals(dstName) && msg instanceof GridDhtPartitionSupplyMessage) {
+                    GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)msg;
+                    return msg0.groupId() == grpId;
+                }
+
+                return false;
+            }
+        });
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index 7100117..13ddb2f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.query.LocalQueryLazyTest;
 import org.apache.ignite.internal.processors.query.LongRunningQueryTest;
 import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest;
 import org.apache.ignite.internal.processors.query.SqlPartOfComplexPkLookupTest;
+import org.apache.ignite.internal.processors.query.SqlQueriesTopologyMappingTest;
 import org.apache.ignite.internal.processors.query.h2.CacheQueryEntityWithDateTimeApiFieldsTest;
 import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessorTest;
 import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest;
@@ -153,7 +154,9 @@ import org.junit.runners.Suite;
 
     SqlPartOfComplexPkLookupTest.class,
 
-    IgniteCacheLocalQueryDefaultTimeoutSelfTest.class
+    IgniteCacheLocalQueryDefaultTimeoutSelfTest.class,
+
+    SqlQueriesTopologyMappingTest.class
 })
 public class IgniteBinaryCacheQueryTestSuite2 {
 }