You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/06/14 05:33:53 UTC
[ignite] branch master updated: IGNITE-17069 SQL Calcite: Support segmented indexes - Fixes #10060.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5359389e152 IGNITE-17069 SQL Calcite: Support segmented indexes - Fixes #10060.
5359389e152 is described below
commit 5359389e1527a52e1de89effd4e1fedd86601417
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Jun 14 10:31:18 2022 +0500
IGNITE-17069 SQL Calcite: Support segmented indexes - Fixes #10060.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../processors/query/calcite/exec/IndexScan.java | 4 +-
.../integration/IndexScanlIntegrationTest.java | 42 ++++++++++
.../cache/query/index/IndexQueryProcessor.java | 95 ++-------------------
.../query/index/sorted/SortedSegmentedIndex.java | 18 ++++
.../query/index/sorted/inline/InlineIndexImpl.java | 96 ++++++++++++++++++++++
.../query/h2/index/client/ClientInlineIndex.java | 11 +++
6 files changed, 173 insertions(+), 93 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index 012070c5548..40874bdafd8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -306,9 +306,7 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
/** {@inheritDoc} */
@Override public GridCursor<IndexRow> find(IndexRow lower, IndexRow upper, IndexQueryContext qctx) {
try {
- int seg = 0; // TODO segments support
-
- return idx.find(lower, upper, true, true, seg, qctx);
+ return idx.find(lower, upper, true, true, qctx);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to find index rows", e);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
index dc9e1e1acf2..29f6b5fc36d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
@@ -22,10 +22,14 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -89,4 +93,42 @@ public class IndexScanlIntegrationTest extends AbstractBasicIntegrationTest {
// range scan and passed to predicate.
assertEquals(1, filteredRows.get());
}
+
+ /** */
+ @Test
+ public void testSegmentedIndexes() {
+ IgniteCache<Integer, Employer> emp = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp")
+ .setSqlSchema("PUBLIC")
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp")))
+ .setQueryParallelism(10)
+ );
+
+ executeSql("CREATE INDEX idx1 ON emp(salary)");
+ executeSql("CREATE INDEX idx2 ON emp(name DESC)");
+
+ for (int i = 0; i < 100; i++)
+ emp.put(i, new Employer("emp" + i, (double)i));
+
+ assertQuery("SELECT name FROM emp WHERE salary BETWEEN 50 AND 55 ORDER BY salary")
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "EMP", "IDX1"))
+ .ordered()
+ .returns("emp50")
+ .returns("emp51")
+ .returns("emp52")
+ .returns("emp53")
+ .returns("emp54")
+ .returns("emp55")
+ .check();
+
+ assertQuery("SELECT name FROM emp WHERE name BETWEEN 'emp60' AND 'emp65' ORDER BY name DESC")
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "EMP", "IDX2"))
+ .ordered()
+ .returns("emp65")
+ .returns("emp64")
+ .returns("emp63")
+ .returns("emp62")
+ .returns("emp61")
+ .returns("emp60")
+ .check();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
index 60dc734220f..c5dd5fb01a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
@@ -19,18 +19,14 @@ package org.apache.ignite.internal.cache.query.index;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.IndexQueryCriterion;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
@@ -97,7 +93,7 @@ public class IndexQueryProcessor {
IndexRangeQuery qry = prepareQuery(idx, idxQryDesc);
- GridCursor<IndexRow> cursor = querySortedIndex(cctx, idx, cacheFilter, qry);
+ GridCursor<IndexRow> cursor = querySortedIndex(idx, cacheFilter, qry);
SortedIndexDefinition def = (SortedIndexDefinition)idxProc.indexDefinition(idx.id());
@@ -483,13 +479,10 @@ public class IndexQueryProcessor {
* @return Result cursor.
*/
private GridCursor<IndexRow> querySortedIndex(
- GridCacheContext<?, ?> cctx,
SortedSegmentedIndex idx,
IndexingQueryFilter cacheFilter,
IndexRangeQuery qry
) throws IgniteCheckedException {
- int segmentsCnt = cctx.isPartitioned() ? cctx.config().getQueryParallelism() : 1;
-
BPlusTree.TreeRowClosure<IndexRow, IndexRow> treeFilter = null;
// No need in the additional filter step for queries with 0 or 1 criteria.
@@ -503,20 +496,11 @@ public class IndexQueryProcessor {
IndexQueryContext qryCtx = new IndexQueryContext(cacheFilter, treeFilter, null);
- if (segmentsCnt == 1)
- return treeIndexRange(idx, 0, qry, qryCtx);
-
- final GridCursor<IndexRow>[] segmentCursors = new GridCursor[segmentsCnt];
-
- // Actually it just traverses BPlusTree to find boundaries. It's too fast to parallelize this.
- for (int i = 0; i < segmentsCnt; i++)
- segmentCursors[i] = treeIndexRange(idx, i, qry, qryCtx);
-
- return new SegmentedIndexCursor(segmentCursors, (SortedIndexDefinition)idxProc.indexDefinition(idx.id()));
+ return treeIndexRange(idx, qry, qryCtx);
}
/**
- * Runs range query over specified segment. There are 2 steps to run query:
+ * Runs range query over all segments. There are 2 steps to run query:
* 1. Traverse index by specified boundaries;
* 2. Scan over cursor and filter rows that doesn't match user criteria.
*
@@ -525,13 +509,13 @@ public class IndexQueryProcessor {
* 2. To apply criteria on non-first index fields. Tree apply boundaries field by field, if first field match
* a boundary, then second field isn't checked within traversing.
*/
- private GridCursor<IndexRow> treeIndexRange(SortedSegmentedIndex idx, int segment, IndexRangeQuery qry, IndexQueryContext qryCtx)
+ private GridCursor<IndexRow> treeIndexRange(SortedSegmentedIndex idx, IndexRangeQuery qry, IndexQueryContext qryCtx)
throws IgniteCheckedException {
boolean lowIncl = inclBoundary(qry, true);
boolean upIncl = inclBoundary(qry, false);
- return idx.find(qry.lower, qry.upper, lowIncl, upIncl, segment, qryCtx);
+ return idx.find(qry.lower, qry.upper, lowIncl, upIncl, qryCtx);
}
/**
@@ -565,75 +549,6 @@ public class IndexQueryProcessor {
return key;
}
- /** Single cursor over multiple segments. The next value is chosen with the index row comparator. */
- private static class SegmentedIndexCursor implements GridCursor<IndexRow> {
- /** Cursors over segments. */
- private final PriorityQueue<GridCursor<IndexRow>> cursors;
-
- /** Comparator to compare index rows. */
- private final Comparator<GridCursor<IndexRow>> cursorComp;
-
- /** */
- private IndexRow head;
-
- /** */
- SegmentedIndexCursor(GridCursor<IndexRow>[] cursors, SortedIndexDefinition idxDef) throws IgniteCheckedException {
- cursorComp = new Comparator<GridCursor<IndexRow>>() {
- @Override public int compare(GridCursor<IndexRow> o1, GridCursor<IndexRow> o2) {
- try {
- int keysLen = o1.get().keys().length;
-
- Iterator<IndexKeyDefinition> it = idxDef.indexKeyDefinitions().values().iterator();
-
- for (int i = 0; i < keysLen; i++) {
- int cmp = idxDef.rowComparator().compareRow(o1.get(), o2.get(), i);
-
- IndexKeyDefinition def = it.next();
-
- if (cmp != 0) {
- boolean desc = def.order().sortOrder() == SortOrder.DESC;
-
- return desc ? -cmp : cmp;
- }
- }
-
- return 0;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to sort remote index rows", e);
- }
- }
- };
-
- this.cursors = new PriorityQueue<>(cursors.length, cursorComp);
-
- for (GridCursor<IndexRow> c: cursors) {
- if (c.next())
- this.cursors.add(c);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean next() throws IgniteCheckedException {
- if (cursors.isEmpty())
- return false;
-
- GridCursor<IndexRow> c = cursors.poll();
-
- head = c.get();
-
- if (c.next())
- cursors.add(c);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public IndexRow get() throws IgniteCheckedException {
- return head;
- }
- }
-
/**
* Checks index rows for matching to specified index criteria.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/SortedSegmentedIndex.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/SortedSegmentedIndex.java
index e119f0c62ba..03a6099f6cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/SortedSegmentedIndex.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/SortedSegmentedIndex.java
@@ -47,6 +47,24 @@ public interface SortedSegmentedIndex extends Index {
IndexQueryContext qryCtx
) throws IgniteCheckedException;
+ /**
+ * Finds index rows by specified range in all tree segments with cache filtering. Range can be bound or unbound.
+ *
+ * @param lower Nullable lower bound.
+ * @param upper Nullable upper bound.
+ * @param lowerIncl {@code true} for inclusive lower bound, otherwise {@code false}.
+ * @param upperIncl {@code true} for inclusive upper bound, otherwise {@code false}.
+ * @param qryCtx External index query context.
+ * @return Cursor of found index rows.
+ */
+ public GridCursor<IndexRow> find(
+ @Nullable IndexRow lower,
+ @Nullable IndexRow upper,
+ boolean lowerIncl,
+ boolean upperIncl,
+ IndexQueryContext qryCtx
+ ) throws IgniteCheckedException;
+
/**
* Finds first index row for specified tree segment and cache filter.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
index 57f8069f8fc..a437e40e32d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.cache.query.index.sorted.inline;
+import java.util.Comparator;
+import java.util.PriorityQueue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
@@ -24,9 +26,12 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.cache.query.index.AbstractIndex;
import org.apache.ignite.internal.cache.query.index.SingleCursor;
+import org.apache.ignite.internal.cache.query.index.SortOrder;
import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexRowComparator;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRowImpl;
import org.apache.ignite.internal.cache.query.index.sorted.IndexValueCursor;
import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
@@ -104,6 +109,27 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
return segments[segment].find(lower, upper, lowIncl, upIncl, closure, null);
}
+ /** {@inheritDoc} */
+ @Override public GridCursor<IndexRow> find(
+ IndexRow lower,
+ IndexRow upper,
+ boolean lowIncl,
+ boolean upIncl,
+ IndexQueryContext qryCtx
+ ) throws IgniteCheckedException {
+ int segmentsCnt = segmentsCount();
+
+ if (segmentsCnt == 1)
+ return find(lower, upper, lowIncl, upIncl, 0, qryCtx);
+
+ final GridCursor<IndexRow>[] segmentCursors = new GridCursor[segmentsCnt];
+
+ for (int i = 0; i < segmentsCnt; i++)
+ segmentCursors[i] = find(lower, upper, lowIncl, upIncl, i, qryCtx);
+
+ return new SegmentedIndexCursor(segmentCursors, def);
+ }
+
/** {@inheritDoc} */
@Override public long count(int segment) throws IgniteCheckedException {
return segments[segment].size();
@@ -471,4 +497,74 @@ public class InlineIndexImpl extends AbstractIndex implements InlineIndex {
public SortedIndexDefinition indexDefinition() {
return def;
}
+
+ /** Single cursor over multiple segments. The next value is chosen with the index row comparator. */
+ private static class SegmentedIndexCursor implements GridCursor<IndexRow> {
+ /** Cursors over segments. */
+ private final PriorityQueue<GridCursor<IndexRow>> cursors;
+
+ /** Comparator to compare index rows. */
+ private final Comparator<GridCursor<IndexRow>> cursorComp;
+
+ /** */
+ private IndexRow head;
+
+ /** */
+ SegmentedIndexCursor(GridCursor<IndexRow>[] cursors, SortedIndexDefinition idxDef) throws IgniteCheckedException {
+ cursorComp = new Comparator<GridCursor<IndexRow>>() {
+ private final IndexRowComparator rowComparator = idxDef.rowComparator();
+
+ private final IndexKeyDefinition[] keyDefs =
+ idxDef.indexKeyDefinitions().values().toArray(new IndexKeyDefinition[0]);
+
+ @Override public int compare(GridCursor<IndexRow> o1, GridCursor<IndexRow> o2) {
+ try {
+ int keysLen = o1.get().keys().length;
+
+ for (int i = 0; i < keysLen; i++) {
+ int cmp = rowComparator.compareRow(o1.get(), o2.get(), i);
+
+ if (cmp != 0) {
+ boolean desc = keyDefs[i].order().sortOrder() == SortOrder.DESC;
+
+ return desc ? -cmp : cmp;
+ }
+ }
+
+ return 0;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to sort remote index rows", e);
+ }
+ }
+ };
+
+ this.cursors = new PriorityQueue<>(cursors.length, cursorComp);
+
+ for (GridCursor<IndexRow> c: cursors) {
+ if (c.next())
+ this.cursors.add(c);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() throws IgniteCheckedException {
+ if (cursors.isEmpty())
+ return false;
+
+ GridCursor<IndexRow> c = cursors.poll();
+
+ head = c.get();
+
+ if (c.next())
+ cursors.add(c);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexRow get() throws IgniteCheckedException {
+ return head;
+ }
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/index/client/ClientInlineIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/index/client/ClientInlineIndex.java
index ae2b99c6a65..40c20b3cf3a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/index/client/ClientInlineIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/index/client/ClientInlineIndex.java
@@ -70,6 +70,17 @@ public class ClientInlineIndex extends AbstractClientIndex implements InlineInde
throw unsupported();
}
+ /** {@inheritDoc} */
+ @Override public GridCursor<IndexRow> find(
+ IndexRow lower,
+ IndexRow upper,
+ boolean lowIncl,
+ boolean upIncl,
+ IndexQueryContext qryCtx
+ ) {
+ throw unsupported();
+ }
+
/** {@inheritDoc} */
@Override public GridCursor<IndexRow> findFirst(int segment, IndexQueryContext qryCtx) {
throw unsupported();