You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/06/15 18:09:14 UTC

[ignite] branch master updated: IGNITE-14120 fix multiple results bug when query parallelism is enabled for single partition query. (#9164)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ae55e3  IGNITE-14120 fix multiple results bug when query parallelism is enabled for single partition query. (#9164)
7ae55e3 is described below

commit 7ae55e302284dff6731c6e6d394ce0124c9a8e3e
Author: Vladimir Ermakov <85...@users.noreply.github.com>
AuthorDate: Tue Jun 15 21:08:52 2021 +0300

    IGNITE-14120 fix multiple results bug when query parallelism is enabled for single partition query. (#9164)
---
 .../query/index/sorted/inline/InlineIndexImpl.java |  15 +-
 .../query/h2/twostep/GridMapQueryExecutor.java     |   9 +-
 .../query/h2/twostep/GridReduceQueryExecutor.java  |   4 +-
 ...niteSqlSinglePartitionMultiParallelismTest.java | 157 +++++++++++++++++++++
 .../testsuites/IgniteCacheQuerySelfTestSuite6.java |   4 +-
 5 files changed, 181 insertions(+), 8 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
index 5dee50e..ebadf4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
@@ -347,11 +347,20 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
     }
 
     /**
-     * @param row cache row.
-     * @return Segment ID for given key
+     * @param row Сache row.
+     * @return Segment ID for given key.
      */
     public int segmentForRow(CacheDataRow row) {
-        return segmentsCount() == 1 ? 0 : (rowHnd.partition(row) % segmentsCount());
+        return calculateSegment(segmentsCount(), segmentsCount() == 1 ? 0 : rowHnd.partition(row));
+    }
+
+    /**
+     * @param segmentsCnt Сount of segments in cache.
+     * @param part Partition.
+     * @return Segment ID for given segment count and partition.
+     */
+    public static int calculateSegment(int segmentsCnt, int part) {
+        return segmentsCnt == 1 ? 0 : (part % segmentsCnt);
     }
 
     /** */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 8da8831..6cdc51c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -86,6 +86,7 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl.calculateSegment;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
@@ -220,9 +221,13 @@ public class GridMapQueryExecutor {
 
         final List<Integer> cacheIds = req.caches();
 
-        int segments = explain || replicated || F.isEmpty(cacheIds) ? 1 :
+        final boolean singlePart = parts != null && parts.length == 1;
+        final int parallelism = explain || replicated || F.isEmpty(cacheIds) ? 1 :
             CU.firstPartitioned(ctx.cache().context(), cacheIds).config().getQueryParallelism();
 
+        final int segments = explain || replicated || singlePart ? 1 : parallelism;
+        final int singleSegment = singlePart ? calculateSegment(parallelism, parts[0]) : 0;
+
         final Object[] params = req.parameters();
 
         final int timeout = req.timeout() > 0 || req.explicitTimeout()
@@ -268,7 +273,7 @@ public class GridMapQueryExecutor {
 
         onQueryRequest0(node,
             req.requestId(),
-            0,
+            singleSegment,
             req.schemaName(),
             req.queries(),
             cacheIds,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 376902d..2564196 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -381,7 +381,7 @@ public class GridReduceQueryExecutor {
 
         final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable() || singlePartMode;
 
-        final int segmentsPerIndex = qry.explain() || qry.isReplicatedOnly() ? 1 :
+        final int segmentsPerIdx = qry.explain() || qry.isReplicatedOnly() || singlePartMode ? 1 :
             mapper.findFirstPartitioned(cacheIds).config().getQueryParallelism();
 
         final long retryTimeout = retryTimeout(timeoutMillis);
@@ -426,7 +426,7 @@ public class GridReduceQueryExecutor {
 
             try {
                 final ReduceQueryRun r = createReduceQueryRun(conn, mapQueries, nodes,
-                    pageSize, segmentsPerIndex, skipMergeTbl, qry.explain(), dataPageScanEnabled);
+                    pageSize, segmentsPerIdx, skipMergeTbl, qry.explain(), dataPageScanEnabled);
 
                 runs.put(qryReqId, r);
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSinglePartitionMultiParallelismTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSinglePartitionMultiParallelismTest.java
new file mode 100644
index 0000000..d2eb332
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSinglePartitionMultiParallelismTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Test for correct results in case of query with single partition and cache with parallelism > 1.
+ * The test is fix  for issue 'IGNITE-14120'.
+ */
+public class IgniteSqlSinglePartitionMultiParallelismTest extends AbstractIndexingCommonTest {
+    /** */
+    private static final String CACHE_NAME = "SC_NULL_TEST";
+
+    /** */
+    private static final int CACHE_PARALLELISM = 8;
+
+    /** */
+    private static final int KEY_CNT = 1024;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(1);
+        ignite(0).createCache(cacheConfig());
+        fillTable();
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Integer, Integer> cacheConfig() {
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+        fields.put("id", Integer.class.getName());
+        fields.put("val", Integer.class.getName());
+
+        return new CacheConfiguration<Integer, Integer>()
+            .setName(CACHE_NAME)
+            .setQueryParallelism(CACHE_PARALLELISM)
+            .setQueryEntities(
+                singletonList(
+                    new QueryEntity(Integer.class.getName(), "newKeyType")
+                        .setTableName(CACHE_NAME)
+                    .setFields(fields)
+                    .setKeyFieldName("id")
+                )
+            );
+    }
+
+    /**
+     * Check common case without partitions. Should be single result.
+     */
+    @Test
+    public void testSimpleCountQuery() throws Exception {
+        List<List<?>> results = runQuery("select count(*) from " + CACHE_NAME);
+
+        Long res = (Long) results.get(0).get(0);
+
+        assertEquals(1, results.size());
+        assertEquals(Long.valueOf(KEY_CNT), res);
+    }
+
+    /**
+     * Check case with every single partition. Partition segment must be calculated correctly.
+     */
+    @Test
+    public void testWhereCounteryPartitionQuery() throws Exception {
+        for (int segment = 0; segment < CACHE_PARALLELISM; segment++) {
+            Integer keyForSegment = segmenKey(segment);
+
+            List<List<?>> results = runQuery("select count(*) from " + CACHE_NAME + " where ID=" + keyForSegment);
+
+            Long res = (Long) results.get(0).get(0);
+
+            assertEquals(1, results.size());
+            assertEquals(Long.valueOf(1), res);
+        }
+    }
+
+    /**
+     * Check case with 2 partitions. Multiple partitions should not be affected.
+     */
+    @Test
+    public void testWhereCountMultiPartitionsQuery() throws Exception {
+        Integer keyFromFirstSegment = segmenKey(0);
+        Integer keyFromLastSegment = segmenKey(CACHE_PARALLELISM - 1);
+
+        List<List<?>> results = runQuery("select count(*) from " + CACHE_NAME + " where ID="
+            + keyFromFirstSegment + " or ID=" + keyFromLastSegment);
+
+        Long res = (Long) results.get(0).get(0);
+
+        assertEquals(1, results.size());
+        assertEquals(Long.valueOf(2), res);
+    }
+
+    /**
+     * @param segment Target index segment.
+     * @return Cache key for target segment.
+     */
+    protected Integer segmenKey(int segment) {
+        IgniteCache<Object, Object> cache = ignite(0).cache(CACHE_NAME);
+        IgniteCacheProxyImpl proxy = cache.unwrap(IgniteCacheProxyImpl.class);
+
+        GridCacheContext<?, ?> cctx = proxy.context();
+
+        for (int k = 1; k <= KEY_CNT; k++) {
+            int keySegment = InlineIndexImpl.calculateSegment(CACHE_PARALLELISM, cctx.affinity().partition(k));
+            if (keySegment == segment)
+                return k;
+        }
+
+        throw new IgniteException("Key is not found. Please, check range of keys and segmentsCnt. " +
+            "Requested segmentId is " + segment);
+    }
+
+    /** */
+    public void fillTable() {
+        for (int i = 1; i <= KEY_CNT; i++)
+            runQuery(String.format("insert into " + CACHE_NAME + "(id, val) VALUES(%d, %d)", i, i));
+    }
+
+    /** */
+    public List<List<?>> runQuery(String qry) {
+        IgniteCache<Integer, Integer> cache = ignite(0).cache(CACHE_NAME);
+
+        return cache.query(
+            new SqlFieldsQuery(qry)
+        ).getAll();
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
index 624dcc5..52140c4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
 import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsQueryTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSinglePartitionMultiParallelismTest;
 import org.apache.ignite.internal.processors.query.MemLeakOnSqlWithClientReconnectTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -73,7 +74,8 @@ import org.junit.runners.Suite;
     CacheContinuousQueryFilterDeploymentFailedTest.class,
     PerformanceStatisticsQueryTest.class,
     CacheContinuousQueryFilterDeploymentFailedTest.class,
-    CacheContinuousQueryDeploymentToClientTest.class
+    CacheContinuousQueryDeploymentToClientTest.class,
+    IgniteSqlSinglePartitionMultiParallelismTest.class
 })
 public class IgniteCacheQuerySelfTestSuite6 {
 }