You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ip...@apache.org on 2019/12/27 12:58:26 UTC
[ignite] branch ignite-2.8 updated: IGNITE-12482 Fix query mapping
on nodes for REPLICATED caches when there are MOVING partitions - Fixes
#7182.
This is an automated email from the ASF dual-hosted git repository.
ipavlukhin pushed a commit to branch ignite-2.8
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.8 by this push:
new abbda34 IGNITE-12482 Fix query mapping on nodes for REPLICATED caches when there are MOVING partitions - Fixes #7182.
abbda34 is described below
commit abbda3498f3b913ac18ca0928b39ea6f213487b8
Author: pavlukhin <vo...@gmail.com>
AuthorDate: Fri Dec 27 15:50:51 2019 +0300
IGNITE-12482 Fix query mapping on nodes for REPLICATED caches when there are MOVING partitions - Fixes #7182.
Signed-off-by: ipavlukhin <vo...@gmail.com>
(cherry picked from commit ea92286323e5c680258e24eeb840af3c94b7b43e)
---
.../query/h2/sql/GridSqlQueryParser.java | 16 +-
.../query/SqlQueriesTopologyMappingTest.java | 185 +++++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite2.java | 5 +-
3 files changed, 200 insertions(+), 6 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 0717198..573df3c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -1811,19 +1811,19 @@ public class GridSqlQueryParser {
GridH2Table tbl = ((GridSqlTable)o).dataTable();
if (tbl != null) {
+ GridCacheContext<?, ?> cctx = tbl.cacheContext();
+
//It's not affinity cache. Can't be local.
- if (tbl.cacheContext() == null)
+ if (cctx == null)
return false;
- GridCacheContext cctx = tbl.cacheContext();
-
if (cctx.mvccEnabled())
return false;
if (cctx.isPartitioned())
return false;
- if (cctx.isReplicated() && !cctx.isReplicatedAffinityNode())
+ if (isReplicatedLocalExecutionImpossible(cctx))
return false;
}
}
@@ -1832,6 +1832,13 @@ public class GridSqlQueryParser {
return true;
}
+ /** */
+ private static boolean isReplicatedLocalExecutionImpossible(GridCacheContext<?, ?> cctx) {
+ // Improvement is possible:
+ // MOVING partitions check inspects full partition map, but possibly only local node check is sufficient.
+ return cctx.isReplicated() && (!cctx.affinityNode() || cctx.topology().hasMovingPartitions());
+ }
+
/**
* Get first (i.e. random, as we need any one) partitioned cache from parsed query
* to determine expected query parallelism.
@@ -2460,7 +2467,6 @@ public class GridSqlQueryParser {
/**
* Field getter.
*/
- @SuppressWarnings("unchecked")
public static class Getter<T, R> {
/** */
private final Field fld;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueriesTopologyMappingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueriesTopologyMappingTest.java
new file mode 100644
index 0000000..2b93e59
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueriesTopologyMappingTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/** */
+public class SqlQueriesTopologyMappingTest extends AbstractIndexingCommonTest {
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setClientMode(client);
+ }
+
+ /** */
+ @Test
+ public void testPartitionedQueryWithRebalance() throws Exception {
+ checkQueryWithRebalance(CacheMode.PARTITIONED);
+ }
+
+ /** */
+ @Test
+ public void testReplicatedQueryWithRebalance() throws Exception {
+ checkQueryWithRebalance(CacheMode.REPLICATED);
+ }
+
+ /** */
+ @Test
+ public void testPartitionedQueryWithNodeFilter() throws Exception {
+ checkQueryWithNodeFilter(CacheMode.PARTITIONED);
+ }
+
+ /** */
+ @Test
+ public void testReplicatedQueryWithNodeFilter() throws Exception {
+ checkQueryWithNodeFilter(CacheMode.REPLICATED);
+ }
+
+ /** */
+ @Test
+ public void testLocalCacheQueryMapping() throws Exception {
+ IgniteEx ign0 = startGrid(0);
+
+ IgniteCache<Object, Object> cache = ign0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setCacheMode(CacheMode.LOCAL)
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ cache.put(1, 2);
+
+ startGrid(1);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer");
+
+ {
+ List<List<?>> res0 = grid(0).cache(DEFAULT_CACHE_NAME).query(qry).getAll();
+
+ assertEquals(1, res0.size());
+ assertEqualsCollections(Arrays.asList(1, 2), res0.get(0));
+ }
+
+ {
+ List<List<?>> res1 = grid(1).cache(DEFAULT_CACHE_NAME).query(qry).getAll();
+
+ assertTrue(res1.isEmpty());
+ }
+ }
+
+ /** */
+ private void checkQueryWithRebalance(CacheMode cacheMode) throws Exception {
+ IgniteEx ign0 = startGrid(0);
+
+ IgniteCache<Object, Object> cache = ign0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setCacheMode(cacheMode)
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ cache.put(1, 2);
+
+ blockRebalanceSupplyMessages(ign0, DEFAULT_CACHE_NAME, getTestIgniteInstanceName(1));
+
+ startGrid(1);
+
+ client = true;
+
+ startGrid(10);
+
+ for (Ignite ign : G.allGrids()) {
+ List<List<?>> res = ign.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("select * from Integer")).getAll();
+
+ assertEquals(1, res.size());
+ assertEqualsCollections(Arrays.asList(1, 2), res.get(0));
+ }
+ }
+
+ /** */
+ private void checkQueryWithNodeFilter(CacheMode cacheMode) throws Exception {
+ IgniteEx ign0 = startGrid(0);
+ String name0 = ign0.name();
+
+ IgniteCache<Object, Object> cache = ign0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setCacheMode(cacheMode)
+ .setNodeFilter(node -> name0.equals(node.attribute(ATTR_IGNITE_INSTANCE_NAME)))
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ cache.put(1, 2);
+
+ startGrid(1);
+
+ client = true;
+
+ startGrid(10);
+
+ for (Ignite ign : G.allGrids()) {
+ List<List<?>> res = ign.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("select * from Integer")).getAll();
+
+ assertEquals(1, res.size());
+ assertEqualsCollections(Arrays.asList(1, 2), res.get(0));
+ }
+ }
+
+ /** */
+ private void blockRebalanceSupplyMessages(IgniteEx sndNode, String cacheName, String dstNodeName) {
+ int grpId = sndNode.cachex(cacheName).context().groupId();
+
+ TestRecordingCommunicationSpi comm0 = (TestRecordingCommunicationSpi)sndNode.configuration().getCommunicationSpi();
+ comm0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ String dstName = node.attribute(ATTR_IGNITE_INSTANCE_NAME);
+
+ if (dstNodeName.equals(dstName) && msg instanceof GridDhtPartitionSupplyMessage) {
+ GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)msg;
+ return msg0.groupId() == grpId;
+ }
+
+ return false;
+ }
+ });
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index 7100117..13ddb2f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.query.LocalQueryLazyTest;
import org.apache.ignite.internal.processors.query.LongRunningQueryTest;
import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest;
import org.apache.ignite.internal.processors.query.SqlPartOfComplexPkLookupTest;
+import org.apache.ignite.internal.processors.query.SqlQueriesTopologyMappingTest;
import org.apache.ignite.internal.processors.query.h2.CacheQueryEntityWithDateTimeApiFieldsTest;
import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessorTest;
import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest;
@@ -153,7 +154,9 @@ import org.junit.runners.Suite;
SqlPartOfComplexPkLookupTest.class,
- IgniteCacheLocalQueryDefaultTimeoutSelfTest.class
+ IgniteCacheLocalQueryDefaultTimeoutSelfTest.class,
+
+ SqlQueriesTopologyMappingTest.class
})
public class IgniteBinaryCacheQueryTestSuite2 {
}