You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/06/14 05:33:53 UTC

[ignite] branch master updated: IGNITE-17069 SQL Calcite: Support segmented indexes - Fixes #10060.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5359389e152 IGNITE-17069 SQL Calcite: Support segmented indexes - Fixes #10060.
5359389e152 is described below

commit 5359389e1527a52e1de89effd4e1fedd86601417
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Jun 14 10:31:18 2022 +0500

    IGNITE-17069 SQL Calcite: Support segmented indexes - Fixes #10060.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../processors/query/calcite/exec/IndexScan.java   |  4 +-
 .../integration/IndexScanlIntegrationTest.java     | 42 ++++++++++
 .../cache/query/index/IndexQueryProcessor.java     | 95 ++-------------------
 .../query/index/sorted/SortedSegmentedIndex.java   | 18 ++++
 .../query/index/sorted/inline/InlineIndexImpl.java | 96 ++++++++++++++++++++++
 .../query/h2/index/client/ClientInlineIndex.java   | 11 +++
 6 files changed, 173 insertions(+), 93 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index 012070c5548..40874bdafd8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -306,9 +306,7 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
         /** {@inheritDoc} */
         @Override public GridCursor<IndexRow> find(IndexRow lower, IndexRow upper, IndexQueryContext qctx) {
             try {
-                int seg = 0; // TODO segments support
-
-                return idx.find(lower, upper, true, true, seg, qctx);
+                return idx.find(lower, upper, true, true, qctx);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException("Failed to find index rows", e);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
index dc9e1e1acf2..29f6b5fc36d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
@@ -22,10 +22,14 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
@@ -89,4 +93,42 @@ public class IndexScanlIntegrationTest extends AbstractBasicIntegrationTest {
         // range scan and passed to predicate.
         assertEquals(1, filteredRows.get());
     }
+
+    /** */
+    @Test
+    public void testSegmentedIndexes() {
+        IgniteCache<Integer, Employer> emp = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp")
+            .setSqlSchema("PUBLIC")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp")))
+            .setQueryParallelism(10)
+        );
+
+        executeSql("CREATE INDEX idx1 ON emp(salary)");
+        executeSql("CREATE INDEX idx2 ON emp(name DESC)");
+
+        for (int i = 0; i < 100; i++)
+            emp.put(i, new Employer("emp" + i, (double)i));
+
+        assertQuery("SELECT name FROM emp WHERE salary BETWEEN 50 AND 55 ORDER BY salary")
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "EMP", "IDX1"))
+            .ordered()
+            .returns("emp50")
+            .returns("emp51")
+            .returns("emp52")
+            .returns("emp53")
+            .returns("emp54")
+            .returns("emp55")
+            .check();
+
+        assertQuery("SELECT name FROM emp WHERE name BETWEEN 'emp60' AND 'emp65' ORDER BY name DESC")
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "EMP", "IDX2"))
+            .ordered()
+            .returns("emp65")
+            .returns("emp64")
+            .returns("emp63")
+            .returns("emp62")
+            .returns("emp61")
+            .returns("emp60")
+            .check();
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
index 60dc734220f..c5dd5fb01a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
@@ -19,18 +19,14 @@ package org.apache.ignite.internal.cache.query.index;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.IndexQuery;
 import org.apache.ignite.cache.query.IndexQueryCriterion;
 import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
@@ -97,7 +93,7 @@ public class IndexQueryProcessor {
 
         IndexRangeQuery qry = prepareQuery(idx, idxQryDesc);
 
-        GridCursor<IndexRow> cursor = querySortedIndex(cctx, idx, cacheFilter, qry);
+        GridCursor<IndexRow> cursor = querySortedIndex(idx, cacheFilter, qry);
 
         SortedIndexDefinition def = (SortedIndexDefinition)idxProc.indexDefinition(idx.id());
 
@@ -483,13 +479,10 @@ public class IndexQueryProcessor {
      * @return Result cursor.
      */
     private GridCursor<IndexRow> querySortedIndex(
-        GridCacheContext<?, ?> cctx,
         SortedSegmentedIndex idx,
         IndexingQueryFilter cacheFilter,
         IndexRangeQuery qry
     ) throws IgniteCheckedException {
-        int segmentsCnt = cctx.isPartitioned() ? cctx.config().getQueryParallelism() : 1;
-
         BPlusTree.TreeRowClosure<IndexRow, IndexRow> treeFilter = null;
 
         // No need in the additional filter step for queries with 0 or 1 criteria.
@@ -503,20 +496,11 @@ public class IndexQueryProcessor {
 
         IndexQueryContext qryCtx = new IndexQueryContext(cacheFilter, treeFilter, null);
 
-        if (segmentsCnt == 1)
-            return treeIndexRange(idx, 0, qry, qryCtx);
-
-        final GridCursor<IndexRow>[] segmentCursors = new GridCursor[segmentsCnt];
-
-        // Actually it just traverses BPlusTree to find boundaries. It's too fast to parallelize this.
-        for (int i = 0; i < segmentsCnt; i++)
-            segmentCursors[i] = treeIndexRange(idx, i, qry, qryCtx);
-
-        return new SegmentedIndexCursor(segmentCursors, (SortedIndexDefinition)idxProc.indexDefinition(idx.id()));
+        return treeIndexRange(idx, qry, qryCtx);
     }
 
     /**
-     * Runs range query over specified segment. There are 2 steps to run query:
+     * Runs range query over all segments. There are 2 steps to run query:
      * 1. Traverse index by specified boundaries;
      * 2. Scan over cursor and filter rows that doesn't match user criteria.
      *
@@ -525,13 +509,13 @@ public class IndexQueryProcessor {
      * 2. To apply criteria on non-first index fields. Tree apply boundaries field by field, if first field match
      * a boundary, then second field isn't checked within traversing.
      */
-    private GridCursor<IndexRow> treeIndexRange(SortedSegmentedIndex idx, int segment, IndexRangeQuery qry, IndexQueryContext qryCtx)
+    private GridCursor<IndexRow> treeIndexRange(SortedSegmentedIndex idx, IndexRangeQuery qry, IndexQueryContext qryCtx)
         throws IgniteCheckedException {
 
         boolean lowIncl = inclBoundary(qry, true);
         boolean upIncl = inclBoundary(qry, false);
 
-        return idx.find(qry.lower, qry.upper, lowIncl, upIncl, segment, qryCtx);
+        return idx.find(qry.lower, qry.upper, lowIncl, upIncl, qryCtx);
     }
 
     /**
@@ -565,75 +549,6 @@ public class IndexQueryProcessor {
         return key;
     }
 
-    /** Single cursor over multiple segments. The next value is chosen with the index row comparator. */
-    private static class SegmentedIndexCursor implements GridCursor<IndexRow> {
-        /** Cursors over segments. */
-        private final PriorityQueue<GridCursor<IndexRow>> cursors;
-
-        /** Comparator to compare index rows. */
-        private final Comparator<GridCursor<IndexRow>> cursorComp;
-
-        /** */
-        private IndexRow head;
-
-        /** */
-        SegmentedIndexCursor(GridCursor<IndexRow>[] cursors, SortedIndexDefinition idxDef) throws IgniteCheckedException {
-            cursorComp = new Comparator<GridCursor<IndexRow>>() {
-                @Override public int compare(GridCursor<IndexRow> o1, GridCursor<IndexRow> o2) {
-                    try {
-                        int keysLen = o1.get().keys().length;
-
-                        Iterator<IndexKeyDefinition> it = idxDef.indexKeyDefinitions().values().iterator();
-
-                        for (int i = 0; i < keysLen; i++) {
-                            int cmp = idxDef.rowComparator().compareRow(o1.get(), o2.get(), i);
-
-                            IndexKeyDefinition def = it.next();
-
-                            if (cmp != 0) {
-                                boolean desc = def.order().sortOrder() == SortOrder.DESC;
-
-                                return desc ? -cmp : cmp;
-                            }
-                        }
-
-                        return 0;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException("Failed to sort remote index rows", e);
-                    }
-                }
-            };
-
-            this.cursors = new PriorityQueue<>(cursors.length, cursorComp);
-
-            for (GridCursor<IndexRow> c: cursors) {
-                if (c.next())
-                    this.cursors.add(c);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() throws IgniteCheckedException {
-            if (cursors.isEmpty())
-                return false;
-
-            GridCursor<IndexRow> c = cursors.poll();
-
-            head = c.get();
-
-            if (c.next())
-                cursors.add(c);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IndexRow get() throws IgniteCheckedException {
-            return head;
-        }
-    }
-
     /**
      * Checks index rows for matching to specified index criteria.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/SortedSegmentedIndex.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/SortedSegmentedIndex.java
index e119f0c62ba..03a6099f6cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/SortedSegmentedIndex.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/SortedSegmentedIndex.java
@@ -47,6 +47,24 @@ public interface SortedSegmentedIndex extends Index {
         IndexQueryContext qryCtx
     ) throws IgniteCheckedException;
 
+    /**
+     * Finds index rows by specified range in all tree segments with cache filtering. Range can be bound or unbound.
+     *
+     * @param lower Nullable lower bound.
+     * @param upper Nullable upper bound.
+     * @param lowerIncl {@code true} for inclusive lower bound, otherwise {@code false}.
+     * @param upperIncl {@code true} for inclusive upper bound, otherwise {@code false}.
+     * @param qryCtx External index query context.
+     * @return Cursor of found index rows.
+     */
+    public GridCursor<IndexRow> find(
+        @Nullable IndexRow lower,
+        @Nullable IndexRow upper,
+        boolean lowerIncl,
+        boolean upperIncl,
+        IndexQueryContext qryCtx
+    ) throws IgniteCheckedException;
+
     /**
      * Finds first index row for specified tree segment and cache filter.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
index 57f8069f8fc..a437e40e32d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.cache.query.index.sorted.inline;
 
+import java.util.Comparator;
+import java.util.PriorityQueue;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
@@ -24,9 +26,12 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.cache.query.index.AbstractIndex;
 import org.apache.ignite.internal.cache.query.index.SingleCursor;
+import org.apache.ignite.internal.cache.query.index.SortOrder;
 import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexRowComparator;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexRowImpl;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexValueCursor;
 import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
@@ -104,6 +109,27 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
         return segments[segment].find(lower, upper, lowIncl, upIncl, closure, null);
     }
 
+    /** {@inheritDoc} */
+    @Override public GridCursor<IndexRow> find(
+        IndexRow lower,
+        IndexRow upper,
+        boolean lowIncl,
+        boolean upIncl,
+        IndexQueryContext qryCtx
+    ) throws IgniteCheckedException {
+        int segmentsCnt = segmentsCount();
+
+        if (segmentsCnt == 1)
+            return find(lower, upper, lowIncl, upIncl, 0, qryCtx);
+
+        final GridCursor<IndexRow>[] segmentCursors = new GridCursor[segmentsCnt];
+
+        for (int i = 0; i < segmentsCnt; i++)
+            segmentCursors[i] = find(lower, upper, lowIncl, upIncl, i, qryCtx);
+
+        return new SegmentedIndexCursor(segmentCursors, def);
+    }
+
     /** {@inheritDoc} */
     @Override public long count(int segment) throws IgniteCheckedException {
         return segments[segment].size();
@@ -471,4 +497,74 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
     public SortedIndexDefinition indexDefinition() {
         return def;
     }
+
+    /** Single cursor over multiple segments. The next value is chosen with the index row comparator. */
+    private static class SegmentedIndexCursor implements GridCursor<IndexRow> {
+        /** Cursors over segments. */
+        private final PriorityQueue<GridCursor<IndexRow>> cursors;
+
+        /** Comparator to compare index rows. */
+        private final Comparator<GridCursor<IndexRow>> cursorComp;
+
+        /** */
+        private IndexRow head;
+
+        /** */
+        SegmentedIndexCursor(GridCursor<IndexRow>[] cursors, SortedIndexDefinition idxDef) throws IgniteCheckedException {
+            cursorComp = new Comparator<GridCursor<IndexRow>>() {
+                private final IndexRowComparator rowComparator = idxDef.rowComparator();
+
+                private final IndexKeyDefinition[] keyDefs =
+                    idxDef.indexKeyDefinitions().values().toArray(new IndexKeyDefinition[0]);
+
+                @Override public int compare(GridCursor<IndexRow> o1, GridCursor<IndexRow> o2) {
+                    try {
+                        int keysLen = o1.get().keys().length;
+
+                        for (int i = 0; i < keysLen; i++) {
+                            int cmp = rowComparator.compareRow(o1.get(), o2.get(), i);
+
+                            if (cmp != 0) {
+                                boolean desc = keyDefs[i].order().sortOrder() == SortOrder.DESC;
+
+                                return desc ? -cmp : cmp;
+                            }
+                        }
+
+                        return 0;
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException("Failed to sort remote index rows", e);
+                    }
+                }
+            };
+
+            this.cursors = new PriorityQueue<>(cursors.length, cursorComp);
+
+            for (GridCursor<IndexRow> c: cursors) {
+                if (c.next())
+                    this.cursors.add(c);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() throws IgniteCheckedException {
+            if (cursors.isEmpty())
+                return false;
+
+            GridCursor<IndexRow> c = cursors.poll();
+
+            head = c.get();
+
+            if (c.next())
+                cursors.add(c);
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IndexRow get() throws IgniteCheckedException {
+            return head;
+        }
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/index/client/ClientInlineIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/index/client/ClientInlineIndex.java
index ae2b99c6a65..40c20b3cf3a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/index/client/ClientInlineIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/index/client/ClientInlineIndex.java
@@ -70,6 +70,17 @@ public class ClientInlineIndex extends AbstractClientIndex implements InlineInde
         throw unsupported();
     }
 
+    /** {@inheritDoc} */
+    @Override public GridCursor<IndexRow> find(
+        IndexRow lower,
+        IndexRow upper,
+        boolean lowIncl,
+        boolean upIncl,
+        IndexQueryContext qryCtx
+    ) {
+        throw unsupported();
+    }
+
     /** {@inheritDoc} */
     @Override public GridCursor<IndexRow> findFirst(int segment, IndexQueryContext qryCtx) {
         throw unsupported();