You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/02/15 06:48:01 UTC
[ignite] branch sql-calcite updated: IGNITE-16111 Index rebuild handling - Fixes #9671.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 778b9a2 IGNITE-16111 Index rebuild handling - Fixes #9671.
778b9a2 is described below
commit 778b9a2c64101508d95bcab34b5dc18f2111e73f
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Feb 15 09:44:59 2022 +0300
IGNITE-16111 Index rebuild handling - Fixes #9671.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../query/calcite/exec/LogicalRelImplementor.java | 103 ++++++-
.../query/calcite/prepare/QueryPlanCacheImpl.java | 14 +-
.../query/calcite/rel/IgniteIndexScan.java | 45 +--
.../calcite/rule/LogicalScanConverterRule.java | 15 +-
.../calcite/rule/logical/ExposeIndexRule.java | 3 +
.../calcite/rule/logical/FilterScanMergeRule.java | 15 +-
.../calcite/rule/logical/ProjectScanMergeRule.java | 18 +-
.../query/calcite/schema/CacheTableImpl.java | 13 +
.../query/calcite/schema/IgniteTable.java | 14 +
.../query/calcite/schema/SchemaHolderImpl.java | 22 ++
.../query/calcite/schema/SystemViewTableImpl.java | 10 +
.../calcite/exec/LogicalRelImplementorTest.java | 329 +++++++++++++++++++++
.../integration/AbstractBasicIntegrationTest.java | 7 +-
.../integration/IndexRebuildIntegrationTest.java | 316 ++++++++++++++++++++
.../planner/AbstractAggregatePlannerTest.java | 2 +-
.../query/calcite/planner/AbstractPlannerTest.java | 211 +------------
.../calcite/planner/IndexRebuildPlannerTest.java | 93 ++++++
.../query/calcite/planner/TestTable.java | 242 +++++++++++++++
.../ignite/testsuites/IgniteCalciteTestSuite.java | 2 +
.../ignite/testsuites/IntegrationTestSuite.java | 2 +
.../apache/ignite/testsuites/PlannerTestSuite.java | 2 +
.../query/schema/SchemaChangeListener.java | 16 +
.../processors/query/h2/IgniteH2Indexing.java | 16 +-
.../processors/query/h2/SchemaManager.java | 35 +++
24 files changed, 1282 insertions(+), 263 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 4be5e6c..12d02f7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@@ -91,6 +92,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceS
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
@@ -99,6 +101,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
@@ -298,13 +301,107 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
Supplier<Row> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
+ ColocationGroup grp = ctx.group(rel.sourceId());
+
IgniteIndex idx = tbl.getIndex(rel.indexName());
- ColocationGroup group = ctx.group(rel.sourceId());
+ if (idx != null && !tbl.isIndexRebuildInProgress()) {
+ Iterable<Row> rowsIter = idx.scan(ctx, grp, filters, lower, upper, prj, requiredColumns);
+
+ return new ScanNode<>(ctx, rowType, rowsIter);
+ }
+ else {
+ // Index was invalidated after planning, workaround through table-scan -> sort -> index spool.
+ // If there are correlates in filter or project, spool node is required to provide ability to rewind input.
+ // Sort node is required if output should be sorted or if spool node required (to provide search by
+ // index conditions).
+ // Additionally, project node is required in case of spool inserted, since spool requires unmodified
+ // original input for filtering by index conditions.
+ boolean filterHasCorrelation = condition != null && RexUtils.hasCorrelation(condition);
+ boolean projectHasCorrelation = projects != null && RexUtils.hasCorrelation(projects);
+ boolean spoolNodeRequired = projectHasCorrelation || filterHasCorrelation;
+ boolean projNodeRequired = projects != null && spoolNodeRequired;
+
+ Iterable<Row> rowsIter = tbl.scan(
+ ctx,
+ grp,
+ filterHasCorrelation ? null : filters,
+ projNodeRequired ? null : prj,
+ requiredColumns
+ );
- Iterable<Row> rowsIter = idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
+ // If there are projects in the scan node - after the scan we already have target row type.
+ if (!spoolNodeRequired && projects != null)
+ rowType = rel.getRowType();
- return new ScanNode<>(ctx, rowType, rowsIter);
+ Node<Row> node = new ScanNode<>(ctx, rowType, rowsIter);
+
+ RelCollation collation = rel.collation();
+
+ if ((!spoolNodeRequired && projects != null) || requiredColumns != null) {
+ collation = collation.apply(LogicalScanConverterRule.createMapping(
+ spoolNodeRequired ? null : projects,
+ requiredColumns,
+ tbl.getRowType(typeFactory).getFieldCount()
+ ));
+ }
+
+ boolean sortNodeRequired = !collation.getFieldCollations().isEmpty();
+
+ if (sortNodeRequired) {
+ SortNode<Row> sortNode = new SortNode<>(ctx, rowType, expressionFactory.comparator(collation));
+
+ sortNode.register(node);
+
+ node = sortNode;
+ }
+
+ if (spoolNodeRequired) {
+ if (lowerCond != null || upperCond != null) {
+ if (requiredColumns != null) {
+ // Remap index find predicate according to rowType of the spool.
+ int cardinality = requiredColumns.cardinality();
+ List<RexNode> remappedLowerCond = lowerCond != null ? new ArrayList<>(cardinality) : null;
+ List<RexNode> remappedUpperCond = upperCond != null ? new ArrayList<>(cardinality) : null;
+
+ for (int i = requiredColumns.nextSetBit(0); i != -1; i = requiredColumns.nextSetBit(i + 1)) {
+ if (remappedLowerCond != null)
+ remappedLowerCond.add(lowerCond.get(i));
+
+ if (remappedUpperCond != null)
+ remappedUpperCond.add(upperCond.get(i));
+ }
+
+ lower = remappedLowerCond == null ? null : expressionFactory.rowSource(remappedLowerCond);
+ upper = remappedUpperCond == null ? null : expressionFactory.rowSource(remappedUpperCond);
+ }
+ }
+
+ IndexSpoolNode<Row> spoolNode = IndexSpoolNode.createTreeSpool(
+ ctx,
+ rowType,
+ collation,
+ expressionFactory.comparator(collation),
+ filterHasCorrelation ? filters : null, // Not correlated filter included into table scan.
+ lower,
+ upper
+ );
+
+ spoolNode.register(node);
+
+ node = spoolNode;
+ }
+
+ if (projNodeRequired) {
+ ProjectNode<Row> projectNode = new ProjectNode<>(ctx, rel.getRowType(), prj);
+
+ projectNode.register(node);
+
+ node = projectNode;
+ }
+
+ return node;
+ }
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 87a8a88..37d8296 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -116,8 +116,18 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
}
/** {@inheritDoc} */
+ @Override public void onIndexRebuildStarted(String schemaName, String tblName) {
+ clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIndexRebuildFinished(String schemaName, String tblName) {
+ clear();
+ }
+
+ /** {@inheritDoc} */
@Override public void onSchemaCreated(String schemaName) {
- // No-op
+ // No-op.
}
/** {@inheritDoc} */
@@ -126,7 +136,7 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
GridQueryTypeDescriptor typeDesc,
GridCacheContextInfo<?, ?> cacheInfo
) {
- // No-op
+ // No-op.
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
index 339454d..9817440 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
@@ -18,11 +18,11 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
-
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rex.RexNode;
@@ -39,6 +39,9 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
/** */
private final long sourceId;
+ /** Index collation. Required only for rewriting index scan to table scan + sort in case of index rebuild. */
+ private final RelCollation collation;
+
/**
* Constructor used for deserialization.
*
@@ -47,6 +50,8 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
public IgniteIndexScan(RelInput input) {
super(changeTraits(input, IgniteConvention.INSTANCE));
+ collation = input.getCollation();
+
Object srcIdObj = input.get("sourceId");
if (srcIdObj != null)
sourceId = ((Number)srcIdObj).longValue();
@@ -60,24 +65,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
* @param traits Traits of this relational expression
* @param tbl Table definition.
* @param idxName Index name.
- */
- public IgniteIndexScan(
- RelOptCluster cluster,
- RelTraitSet traits,
- RelOptTable tbl,
- String idxName) {
- this(cluster, traits, tbl, idxName, null, null, null, null);
- }
-
- /**
- * Creates a IndexScan.
- * @param cluster Cluster that this relational expression belongs to
- * @param traits Traits of this relational expression
- * @param tbl Table definition.
- * @param idxName Index name.
* @param proj Projects.
* @param cond Filters.
* @param requiredCols Participating columns.
+ * @param collation Index collation.
*/
public IgniteIndexScan(
RelOptCluster cluster,
@@ -87,9 +78,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
@Nullable List<RexNode> proj,
@Nullable RexNode cond,
@Nullable IndexConditions idxCond,
- @Nullable ImmutableBitSet requiredCols
+ @Nullable ImmutableBitSet requiredCols,
+ RelCollation collation
) {
- this(-1L, cluster, traits, tbl, idxName, proj, cond, idxCond, requiredCols);
+ this(-1L, cluster, traits, tbl, idxName, proj, cond, idxCond, requiredCols, collation);
}
/**
@@ -101,6 +93,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
* @param proj Projects.
* @param cond Filters.
* @param requiredCols Participating colunms.
+ * @param collation Index collation.
*/
private IgniteIndexScan(
long sourceId,
@@ -111,11 +104,13 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
@Nullable List<RexNode> proj,
@Nullable RexNode cond,
@Nullable IndexConditions idxCond,
- @Nullable ImmutableBitSet requiredCols
+ @Nullable ImmutableBitSet requiredCols,
+ RelCollation collation
) {
super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, idxCond, requiredCols);
this.sourceId = sourceId;
+ this.collation = collation;
}
/** {@inheritDoc} */
@@ -126,7 +121,8 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
/** {@inheritDoc} */
@Override protected RelWriter explainTerms0(RelWriter pw) {
return super.explainTerms0(pw)
- .itemIf("sourceId", sourceId, sourceId != -1);
+ .itemIf("sourceId", sourceId, sourceId != -1)
+ .item("collation", collation());
}
/** {@inheritDoc} */
@@ -137,12 +133,17 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
/** {@inheritDoc} */
@Override public IgniteRel clone(long sourceId) {
return new IgniteIndexScan(sourceId, getCluster(), getTraitSet(), getTable(),
- idxName, projects, condition, idxCond, requiredColumns);
+ idxName, projects, condition, idxCond, requiredColumns, collation);
}
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteIndexScan(sourceId, cluster, getTraitSet(), getTable(),
- idxName, projects, condition, idxCond, requiredColumns);
+ idxName, projects, condition, idxCond, requiredColumns, collation);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelCollation collation() {
+ return collation;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
index ef26e4c..348f20f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
@@ -62,10 +62,16 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
) {
RelOptCluster cluster = rel.getCluster();
IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
- IgniteIndex index = table.getIndex(rel.indexName());
+ IgniteIndex idx = table.getIndex(rel.indexName());
+
+ if (table.isIndexRebuildInProgress()) {
+ cluster.getPlanner().prune(rel);
+
+ return null;
+ }
RelDistribution distribution = table.distribution();
- RelCollation collation = index.collation();
+ RelCollation collation = idx.collation();
if (rel.projects() != null || rel.requiredColumns() != null) {
Mappings.TargetMapping mapping = createMapping(
@@ -101,7 +107,8 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
rel.projects(),
rel.condition(),
rel.indexConditions(),
- rel.requiredColumns()
+ rel.requiredColumns(),
+ idx.collation()
);
}
};
@@ -154,7 +161,7 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
}
/** */
- private static Mappings.TargetMapping createMapping(
+ public static Mappings.TargetMapping createMapping(
List<RexNode> projects,
ImmutableBitSet requiredColumns,
int tableRowSize
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ExposeIndexRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ExposeIndexRule.java
index 620b009..9fbed19 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ExposeIndexRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ExposeIndexRule.java
@@ -68,6 +68,9 @@ public class ExposeIndexRule extends RelRule<ExposeIndexRule.Config> {
RexNode condition = scan.condition();
ImmutableBitSet requiredCols = scan.requiredColumns();
+ if (igniteTable.isIndexRebuildInProgress())
+ return;
+
List<IgniteLogicalIndexScan> indexes = igniteTable.indexes().values().stream()
.map(idx -> idx.toRel(cluster, optTable, proj, condition, requiredCols))
.collect(Collectors.toList());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
index c6c665b..b8a8e62 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
@@ -31,9 +31,11 @@ import org.apache.calcite.rex.RexUtil;
import org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
/**
* Rule that pushes filter into the scan. This might be useful for index range scans.
@@ -91,11 +93,14 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
RelNode res = createNode(cluster, scan, trait, condition);
+ if (res == null)
+ return;
+
call.transformTo(res);
}
/** */
- protected abstract T createNode(RelOptCluster cluster, T scan, RelTraitSet traits, RexNode cond);
+ protected abstract @Nullable T createNode(RelOptCluster cluster, T scan, RelTraitSet traits, RexNode cond);
/** */
private static class FilterIndexScanMergeRule extends FilterScanMergeRule<IgniteLogicalIndexScan> {
@@ -105,12 +110,18 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
}
/** {@inheritDoc} */
- @Override protected IgniteLogicalIndexScan createNode(
+ @Override protected @Nullable IgniteLogicalIndexScan createNode(
RelOptCluster cluster,
IgniteLogicalIndexScan scan,
RelTraitSet traits,
RexNode cond
) {
+ if (scan.getTable().unwrap(IgniteTable.class).isIndexRebuildInProgress()) {
+ cluster.getPlanner().prune(scan);
+
+ return null;
+ }
+
return IgniteLogicalIndexScan.create(cluster, traits, scan.getTable(), scan.indexName(),
scan.projects(), cond, scan.requiredColumns());
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
index f981196..bfe2019 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
/** */
@Value.Enclosing
@@ -53,7 +54,7 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
public static final RelOptRule TABLE_SCAN_SKIP_CORRELATED = Config.TABLE_SCAN_SKIP_CORRELATED.toRule();
/** */
- protected abstract T createNode(
+ protected abstract @Nullable T createNode(
RelOptCluster cluster,
T scan,
RelTraitSet traits,
@@ -139,7 +140,12 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
if (RexUtils.isIdentity(projects, tbl.getRowType(typeFactory, requiredColumns), true))
projects = null;
- call.transformTo(createNode(cluster, scan, traits, projects, cond, requiredColumns));
+ T res = createNode(cluster, scan, traits, projects, cond, requiredColumns);
+
+ if (res == null)
+ return;
+
+ call.transformTo(res);
if (!RexUtils.hasCorrelation(relProject.getProjects()))
cluster.getPlanner().prune(relProject);
@@ -188,7 +194,7 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
}
/** {@inheritDoc} */
- @Override protected IgniteLogicalIndexScan createNode(
+ @Override protected @Nullable IgniteLogicalIndexScan createNode(
RelOptCluster cluster,
IgniteLogicalIndexScan scan,
RelTraitSet traits,
@@ -196,6 +202,12 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
RexNode cond,
ImmutableBitSet requiredColumns
) {
+ if (scan.getTable().unwrap(IgniteTable.class).isIndexRebuildInProgress()) {
+ cluster.getPlanner().prune(scan);
+
+ return null;
+ }
+
return IgniteLogicalIndexScan.create(
cluster,
traits,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
index 2a3566a..1d03652 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
@@ -60,6 +60,9 @@ public class CacheTableImpl extends AbstractTable implements IgniteCacheTable {
/** */
private final Map<String, IgniteIndex> indexes = new ConcurrentHashMap<>();
+ /** */
+ private volatile boolean idxRebuildInProgress;
+
/**
* @param ctx Kernal context.
* @param desc Table descriptor.
@@ -152,6 +155,16 @@ public class CacheTableImpl extends AbstractTable implements IgniteCacheTable {
}
/** {@inheritDoc} */
+ @Override public void markIndexRebuildInProgress(boolean mark) {
+ idxRebuildInProgress = mark;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isIndexRebuildInProgress() {
+ return idxRebuildInProgress;
+ }
+
+ /** {@inheritDoc} */
@Override public <C> C unwrap(Class<C> aCls) {
if (aCls.isInstance(desc))
return aCls.cast(desc);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index d67c891..b42f4fa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -141,4 +141,18 @@ public interface IgniteTable extends TranslatableTable {
* Is table modifiable.
*/
boolean isModifiable();
+
+ /**
+ * Mark table for index rebuild.
+ *
+ * @param mark Mark/unmark flag, {@code true} if index rebuild started, {@code false} if finished.
+ */
+ void markIndexRebuildInProgress(boolean mark);
+
+ /**
+ * Returns index rebuild flag.
+ *
+ * @param {@code True} if index rebuild in progress.
+ */
+ boolean isIndexRebuildInProgress();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 69876e9..f755c55 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -266,6 +266,28 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
}
/** {@inheritDoc} */
+ @Override public void onIndexRebuildStarted(String schemaName, String tblName) {
+ IgniteSchema schema = igniteSchemas.get(schemaName);
+ assert schema != null;
+
+ IgniteTable tbl = (IgniteTable)schema.getTable(tblName);
+ assert tbl != null;
+
+ tbl.markIndexRebuildInProgress(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIndexRebuildFinished(String schemaName, String tblName) {
+ IgniteSchema schema = igniteSchemas.get(schemaName);
+ assert schema != null;
+
+ IgniteTable tbl = (IgniteTable)schema.getTable(tblName);
+ assert tbl != null;
+
+ tbl.markIndexRebuildInProgress(false);
+ }
+
+ /** {@inheritDoc} */
@Override public void onFunctionCreated(String schemaName, String name, Method method) {
IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
index 3f1056c..fee4ca5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
@@ -153,6 +153,16 @@ public class SystemViewTableImpl extends AbstractTable implements IgniteTable {
return false;
}
+ /** {@inheritDoc} */
+ @Override public void markIndexRebuildInProgress(boolean mark) {
+ throw new AssertionError("Index rebuild in progress was marked for system view");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isIndexRebuildInProgress() {
+ return false;
+ }
+
/** */
private static class StatisticsImpl implements Statistic {
/** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
new file mode 100644
index 0000000..b3d94b9
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.planner.TestTable;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/**
+ * Test LogicalRelImplementor class.
+ */
+public class LogicalRelImplementorTest extends GridCommonAbstractTest {
+ /** */
+ @Test
+ public void testIndexScanRewriter() {
+ IgniteTypeFactory tf = Commons.typeFactory();
+
+ RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(tf);
+
+ RelDataType sqlTypeInt = tf.createSqlType(SqlTypeName.INTEGER);
+ RelDataType sqlTypeVarchar = tf.createSqlType(SqlTypeName.VARCHAR);
+
+ b.add("_KEY", tf.createJavaType(Object.class));
+ b.add("_VAL", tf.createJavaType(Object.class));
+ b.add("ID", sqlTypeInt);
+ b.add("VAL", sqlTypeVarchar);
+
+ RelDataType rowType = b.build();
+
+ ScanAwareTable tbl = new ScanAwareTable(rowType);
+
+ tbl.addIndex("IDX", 2);
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+ publicSchema.addTable("TBL", tbl);
+
+ BaseQueryContext qctx = BaseQueryContext.builder()
+ .frameworkConfig(
+ newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(createRootSchema(false).add(publicSchema.getName(), publicSchema))
+ .build()
+ )
+ .logger(log)
+ .build();
+
+ UUID nodeId = UUID.randomUUID();
+
+ ExecutionContext<Object[]> ectx = new ExecutionContext<Object[]>(
+ qctx,
+ null,
+ null,
+ nodeId,
+ nodeId,
+ null,
+ null,
+ ArrayRowHandler.INSTANCE,
+ null
+ ) {
+ @Override public ColocationGroup group(long srcId) {
+ return ColocationGroup.forNodes(Collections.singletonList(nodeId));
+ }
+ };
+
+ LogicalRelImplementor<Object[]> relImplementor = new LogicalRelImplementor<>(
+ ectx,
+ null,
+ null,
+ null,
+ null
+ );
+
+ // Construct relational operator corresponding to SQL: "SELECT val, id, id + 1 FROM TBL WHERE id = 1"
+ RelOptCluster cluster = Commons.emptyCluster();
+ RexBuilder rexBuilder = cluster.getRexBuilder();
+
+ // Projects, filters and required columns.
+ List<RexNode> project = F.asList(
+ rexBuilder.makeLocalRef(sqlTypeVarchar, 1),
+ rexBuilder.makeLocalRef(sqlTypeInt, 0),
+ rexBuilder.makeCall(SqlStdOperatorTable.PLUS,
+ rexBuilder.makeLocalRef(sqlTypeInt, 0),
+ rexBuilder.makeLiteral(1, sqlTypeInt))
+ );
+
+ RexNode filter = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeLocalRef(sqlTypeInt, 0),
+ rexBuilder.makeLiteral(1, sqlTypeInt)
+ );
+
+ ImmutableBitSet requiredColumns = ImmutableBitSet.of(2, 3);
+
+ // Collations.
+ RelCollation idxCollation = tbl.getIndex("IDX").collation();
+
+ RelCollation colCollation = idxCollation.apply(Mappings.target(requiredColumns.asList(),
+ tbl.getRowType(tf).getFieldCount()));
+
+ RelCollation projCollation = TraitUtils.projectCollation(colCollation, RexUtils.replaceLocalRefs(project),
+ tbl.getRowType(tf, requiredColumns));
+
+ RelCollation emptyCollation = RelCollations.of();
+
+ // Correlated projects and filters.
+ RexShuttle replaceLiteralToCorr = new RexShuttle() {
+ @Override public RexNode visitLiteral(RexLiteral literal) {
+ return rexBuilder.makeFieldAccess(
+ rexBuilder.makeCorrel(tbl.getRowType(tf), new CorrelationId(0)), "ID", false);
+ }
+ };
+
+ RexNode corrFilter = replaceLiteralToCorr.apply(filter);
+ List<RexNode> corrProject = F.asList(project.get(0), project.get(1), replaceLiteralToCorr.apply(project.get(2)));
+
+ tbl.markIndexRebuildInProgress(true);
+
+ Predicate<Node<Object[]>> isScanNoFilterNoProject =
+ node -> node instanceof ScanNode && !tbl.lastScanHasFilter && !tbl.lastScanHasProject;
+ Predicate<Node<Object[]>> isScanWithFilterNoProject =
+ node -> node instanceof ScanNode && tbl.lastScanHasFilter && !tbl.lastScanHasProject;
+ Predicate<Node<Object[]>> isScanWithProjectNoFilter =
+ node -> node instanceof ScanNode && !tbl.lastScanHasFilter && tbl.lastScanHasProject;
+ Predicate<Node<Object[]>> isScanWithFilterWithProject =
+ node -> node instanceof ScanNode && tbl.lastScanHasFilter && tbl.lastScanHasProject;
+
+ Predicate<Node<Object[]>> isSort = node -> node instanceof SortNode;
+ Predicate<Node<Object[]>> isSpool = node -> node instanceof IndexSpoolNode;
+ Predicate<Node<Object[]>> isProj = node -> node instanceof ProjectNode;
+
+ IgniteIndexScan templateScan = new IgniteIndexScan(
+ cluster,
+ cluster.traitSet(),
+ qctx.catalogReader().getTable(F.asList("PUBLIC", "TBL")),
+ "IDX",
+ project,
+ filter,
+ RexUtils.buildSortedIndexConditions(cluster, idxCollation, filter, rowType, requiredColumns),
+ requiredColumns,
+ idxCollation
+ );
+
+ IgniteIndexScan scan;
+
+ // IndexScan without filters and projects transforms to scan and sort.
+ scan = createScan(templateScan, idxCollation, null, null, null);
+ checkNodesChain(relImplementor, scan, isSort, isScanNoFilterNoProject);
+
+ scan = createScan(templateScan, projCollation, null, null, requiredColumns);
+ checkNodesChain(relImplementor, scan, isSort, isScanNoFilterNoProject);
+
+ // IndexScan with simple filters and projects transforms to scan and sort.
+ scan = createScan(templateScan, projCollation, project, filter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isSort, isScanWithFilterWithProject);
+
+ scan = createScan(templateScan, colCollation, null, filter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isSort, isScanWithFilterNoProject);
+
+ scan = createScan(templateScan, projCollation, project, null, requiredColumns);
+ checkNodesChain(relImplementor, scan, isSort, isScanWithProjectNoFilter);
+
+ // IndexScan with correlated filter without project transforms to scan, sort and spool.
+ scan = createScan(templateScan, projCollation, null, corrFilter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isSpool, isSort, isScanNoFilterNoProject);
+
+ // IndexScan with correlated filter without project transforms to scan, sort and spool.
+ scan = createScan(templateScan, idxCollation, null, corrFilter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isSpool, isSort, isScanNoFilterNoProject);
+
+ // IndexScan with correlated filter with project transforms to scan, sort, spool and project.
+ scan = createScan(templateScan, projCollation, project, corrFilter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+
+ // IndexScan with correlated project transforms to scan, sort, spool and project.
+ scan = createScan(templateScan, projCollation, corrProject, null, requiredColumns);
+ checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+
+ scan = createScan(templateScan, projCollation, corrProject, filter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanWithFilterNoProject);
+
+ scan = createScan(templateScan, projCollation, corrProject, corrFilter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+
+ // IndexScan with simple project without collation transforms to scan.
+ List<RexNode> unknownCollationProject = new ArrayList<>(1);
+ unknownCollationProject.add(corrProject.get(0)); // Field "val".
+
+ scan = createScan(templateScan, emptyCollation, unknownCollationProject, filter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isScanWithFilterWithProject);
+
+ scan = createScan(templateScan, emptyCollation, unknownCollationProject, null, requiredColumns);
+ checkNodesChain(relImplementor, scan, isScanWithProjectNoFilter);
+
+ // IndexScan with correlated project without collation transforms to scan, sort, spool and project.
+ List<RexNode> unknownCollationCorrProject = new ArrayList<>(1);
+ unknownCollationCorrProject.add(corrProject.get(2)); // Field "id + $cor0.id".
+
+ scan = createScan(templateScan, emptyCollation, unknownCollationCorrProject, filter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanWithFilterNoProject);
+
+ scan = createScan(templateScan, emptyCollation, unknownCollationCorrProject, corrFilter, requiredColumns);
+ checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+
+ scan = createScan(templateScan, emptyCollation, unknownCollationCorrProject, null, requiredColumns);
+ checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+ }
+
+ /** */
+ private IgniteIndexScan createScan(
+ IgniteIndexScan templateScan,
+ RelCollation collation,
+ List<RexNode> projects,
+ RexNode filters,
+ ImmutableBitSet requiredColumns
+ ) {
+ return new IgniteIndexScan(
+ templateScan.getCluster(),
+ templateScan.getTraitSet().replace(collation),
+ templateScan.getTable(),
+ templateScan.indexName(),
+ projects,
+ filters,
+ templateScan.indexConditions(),
+ requiredColumns,
+ templateScan.collation()
+ );
+ }
+
+ /** */
+ private <Row> void checkNodesChain(
+ LogicalRelImplementor<Row> relImplementor,
+ IgniteIndexScan scan,
+ Predicate<Node<Row>>... predicates
+ ) {
+ Node<Row> node = relImplementor.visit(scan);
+
+ boolean lastFound = false;
+
+ for (Predicate<Node<Row>> predicate : predicates) {
+ assertFalse("Not enough nodes", lastFound);
+ assertTrue("Node " + node + " doesn't match predicate", predicate.test(node));
+
+ if (!F.isEmpty(node.sources()))
+ node = node.sources().get(0);
+ else
+ lastFound = true;
+ }
+
+ assertTrue("Too much nodes", lastFound);
+ }
+
+ /** */
+ private static class ScanAwareTable extends TestTable {
+ /** */
+ private volatile boolean lastScanHasFilter;
+
+ /** */
+ private volatile boolean lastScanHasProject;
+
+ /** */
+ public ScanAwareTable(RelDataType rowType) {
+ super(rowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> Iterable<Row> scan(
+ ExecutionContext<Row> execCtx,
+ ColocationGroup grp,
+ Predicate<Row> filter,
+ Function<Row, Row> transformer,
+ ImmutableBitSet bitSet)
+ {
+ lastScanHasFilter = filter != null;
+ lastScanHasProject = transformer != null;
+ return Collections.emptyList();
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 0da2463..782a05c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -91,9 +91,14 @@ public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
/** */
protected QueryChecker assertQuery(String qry) {
+ return assertQuery(client, qry);
+ }
+
+ /** */
+ protected QueryChecker assertQuery(IgniteEx ignite, String qry) {
return new QueryChecker(qry) {
@Override protected QueryEngine getEngine() {
- return Commons.lookupComponent(client.context(), QueryEngine.class);
+ return Commons.lookupComponent(ignite.context(), QueryEngine.class);
}
};
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexRebuildIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexRebuildIntegrationTest.java
new file mode 100644
index 0000000..4d6e986
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexRebuildIntegrationTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cache.query.index.IndexProcessor;
+import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+/**
+ * Test index rebuild.
+ */
+public class IndexRebuildIntegrationTest extends AbstractBasicIntegrationTest {
+ /** Index rebuild init latch. */
+ private static CountDownLatch initLatch;
+
+ /** Index rebuild start latch. */
+ private static CountDownLatch startLatch;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class;
+
+ return super.getConfiguration(igniteInstanceName).setDataStorageConfiguration(
+ new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true)));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int nodeCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ initLatch = null;
+ startLatch = null;
+
+ super.beforeTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ // Override super method to skip caches destroy.
+ cleanQueryPlanCache();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ cleanPersistenceDir();
+
+ super.beforeTestsStarted();
+
+ client.cluster().state(ClusterState.ACTIVE);
+
+ executeSql("CREATE TABLE tbl (id INT PRIMARY KEY, val VARCHAR, val2 VARCHAR) WITH CACHE_NAME=\"test\"");
+ executeSql("CREATE INDEX idx_id_val ON tbl (id DESC, val)");
+ executeSql("CREATE INDEX idx_id_val2 ON tbl (id, val2 DESC)");
+ executeSql("CREATE INDEX idx_val ON tbl (val DESC)");
+
+ for (int i = 0; i < 100; i++)
+ executeSql("INSERT INTO tbl VALUES (?, ?, ?)", i, "val" + i, "val" + i);
+
+ executeSql("CREATE TABLE tbl2 (id INT PRIMARY KEY, val VARCHAR)");
+
+ for (int i = 0; i < 100; i++)
+ executeSql("INSERT INTO tbl2 VALUES (?, ?)", i, "val" + i);
+ }
+
+ /** */
+ @Test
+ public void testRebuildOnInitiatorNode() throws Exception {
+ String sql = "SELECT * FROM tbl WHERE id = 0 AND val='val0'";
+
+ QueryChecker validChecker = assertQuery(grid(0), sql)
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns(0, "val0", "val0");
+
+ QueryChecker rebuildingChecker = assertQuery(grid(0), sql)
+ .matches(QueryChecker.containsTableScan("PUBLIC", "TBL"))
+ .returns(0, "val0", "val0");
+
+ checkRebuildIndexQuery(grid(0), validChecker, rebuildingChecker);
+ }
+
+ /** */
+ @Test
+ public void testRebuildOnRemoteNodeUncorrelated() throws Exception {
+ IgniteEx initNode = grid(0);
+
+ // Uncorrelated, filter by indexed field, without projection (idenitity projection).
+ for (int i = 0; i < 10; i++) {
+ QueryChecker checker = assertQuery(initNode, "SELECT * FROM tbl WHERE id = ? AND val = ?")
+ .withParams(i, "val" + i)
+ .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns(i, "val" + i, "val" + i);
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+ }
+
+ // Uncorrelated, filter by indexed field, with projection.
+ for (int i = 0; i < 10; i++) {
+ QueryChecker checker = assertQuery(initNode, "SELECT val FROM tbl WHERE id = ? AND val = ?")
+ .withParams(i, "val" + i)
+ .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns("val" + i);
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+ }
+ }
+
+ /** */
+ @Test
+ public void testRebuildOnRemoteNodeSorted() throws Exception {
+ IgniteEx initNode = grid(0);
+
+ // Order by part of index collation, without projection.
+ String sql = "SELECT * FROM tbl WHERE id >= 10 and id <= 15 ORDER BY id DESC";
+
+ QueryChecker checker = assertQuery(initNode, sql)
+ .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IndexSpool")))
+ .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns(15, "val15", "val15").returns(14, "val14", "val14").returns(13, "val13", "val13")
+ .returns(12, "val12", "val12").returns(11, "val11", "val11").returns(10, "val10", "val10")
+ .ordered();
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+
+ // Order by part of index collation, with projection.
+ sql = "SELECT val FROM tbl WHERE id >= 10 and id < 20 ORDER BY id DESC";
+
+ checker = assertQuery(initNode, sql)
+ .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IndexSpool")))
+ .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns("val19").returns("val18").returns("val17").returns("val16").returns("val15")
+ .returns("val14").returns("val13").returns("val12").returns("val11").returns("val10")
+ .ordered();
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+
+ // Order by full index collation, with projection.
+ sql = "SELECT val FROM tbl WHERE id >= 10 and id < 20 ORDER BY id DESC, val";
+
+ checker = assertQuery(initNode, sql)
+ .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IndexSpool")))
+ .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns("val19").returns("val18").returns("val17").returns("val16").returns("val15")
+ .returns("val14").returns("val13").returns("val12").returns("val11").returns("val10")
+ .ordered();
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+
+ // Order by another collation.
+ sql = "SELECT * FROM tbl WHERE val BETWEEN 'val10' AND 'val15' ORDER BY val";
+
+ checker = assertQuery(initNode, sql)
+ .matches(QueryChecker.containsSubPlan("IgniteSort"))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_VAL"))
+ .returns(10, "val10", "val10").returns(11, "val11", "val11").returns(12, "val12", "val12")
+ .returns(13, "val13", "val13").returns(14, "val14", "val14").returns(15, "val15", "val15")
+ .ordered();
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+ }
+
+ /** */
+ @Test
+ public void testRebuildOnRemoteNodeCorrelated() throws Exception {
+ IgniteEx initNode = grid(0);
+
+ // Correlated join with correlation in filter, without project.
+ String sql = "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ tbl2.id, tbl.val " +
+ "FROM tbl2 LEFT JOIN tbl ON tbl.id = tbl2.id AND tbl.val = tbl2.val AND tbl.id % 2 = 0 " +
+ "WHERE tbl2.id BETWEEN 10 AND 19";
+
+ QueryChecker checker = assertQuery(initNode, sql)
+ .matches(QueryChecker.containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns(10, "val10").returns(11, null).returns(12, "val12").returns(13, null).returns(14, "val14")
+ .returns(15, null).returns(16, "val16").returns(17, null).returns(18, "val18").returns(19, null);
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+
+ // Correlated join with correlation in filter, with project.
+ sql = "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ tbl2.id, tbl.val1 " +
+ "FROM tbl2 JOIN (SELECT tbl.val || '-' AS val1, val, id FROM tbl) AS tbl " +
+ "ON tbl.id = tbl2.id AND tbl.val = tbl2.val " +
+ "WHERE tbl2.id BETWEEN 10 AND 12";
+
+ checker = assertQuery(initNode, sql)
+ .matches(QueryChecker.containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns(10, "val10-").returns(11, "val11-").returns(12, "val12-");
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+ }
+
+ /** */
+ @Test
+ public void testRebuildOnRemoteNodeCollationRestore() throws Exception {
+ IgniteEx initNode = grid(0);
+
+ // Correlated join with correlation in filter, with project as a subset of collation.
+ String sql = "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ tbl2.id, tbl.id1 " +
+ "FROM tbl2 JOIN (SELECT tbl.id + 1 AS id1, id FROM tbl WHERE val >= 'val') AS tbl " +
+ "ON tbl.id = tbl2.id " +
+ "WHERE tbl2.val BETWEEN 'val10' AND 'val12'";
+
+ QueryChecker checker = assertQuery(initNode, sql)
+ .matches(QueryChecker.containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+ .returns(10, 11).returns(11, 12).returns(12, 13);
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+
+ // Correlated join with correlation in filter, with a project as a subset of collation with DESC ordering.
+ sql = "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ tbl2.id, tbl.id1 " +
+ "FROM tbl2 JOIN (SELECT tbl.id + 1 AS id1, id FROM tbl WHERE val2 >= 'val') AS tbl " +
+ "ON tbl.id = tbl2.id " +
+ "WHERE tbl2.val BETWEEN 'val10' AND 'val12'";
+
+ checker = assertQuery(initNode, sql)
+ .matches(QueryChecker.containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+ .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL2"))
+ .returns(10, 11).returns(11, 12).returns(12, 13);
+
+ checkRebuildIndexQuery(grid(1), checker, checker);
+ }
+
+ /** */
+ private void checkRebuildIndexQuery(
+ IgniteEx rebuildNode,
+ QueryChecker checkerWhenValid,
+ QueryChecker checkerWhenRebuilding
+ ) throws Exception {
+ initLatch = new CountDownLatch(1);
+ startLatch = new CountDownLatch(1);
+
+ GridCacheContext<?, ?> cctx = rebuildNode.context().cache().cache("test").context();
+
+ checkerWhenValid.check();
+
+ GridTestUtils.runAsync(() -> forceRebuildIndexes(rebuildNode, cctx));
+
+ try {
+ initLatch.await(10, TimeUnit.SECONDS);
+
+ // Check query cache invalidated after index rebuild start.
+ checkerWhenRebuilding.check();
+ }
+ finally {
+ startLatch.countDown();
+ }
+
+ indexRebuildFuture(rebuildNode, cctx.cacheId()).get();
+
+ // Check query cache invalidated after index rebuild finish.
+ checkerWhenValid.check();
+ }
+
+ /** */
+ static class BlockingIndexesRebuildTask extends IndexesRebuildTask {
+ /** {@inheritDoc} */
+ @Override protected void startRebuild(
+ GridCacheContext cctx,
+ GridFutureAdapter<Void> fut,
+ SchemaIndexCacheVisitorClosure clo,
+ IndexRebuildCancelToken cancelTok
+ ) {
+ try {
+ if (initLatch != null)
+ initLatch.countDown();
+
+ if (startLatch != null)
+ startLatch.await(100, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+ super.startRebuild(cctx, fut, clo, cancelTok);
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractAggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractAggregatePlannerTest.java
index 421b948..dde5390 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractAggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractAggregatePlannerTest.java
@@ -57,7 +57,7 @@ public class AbstractAggregatePlannerTest extends AbstractPlannerTest {
/**
* @return PARTITIONED test table (ID, VAL0, VAL1, GRP0, GRP1)
*/
- @NotNull protected AbstractPlannerTest.TestTable createAffinityTable() {
+ @NotNull protected TestTable createAffinityTable() {
IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
return new TestTable(
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 8cc783b..62b1fb0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -22,23 +22,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
-import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
-import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelVisitor;
@@ -46,14 +40,9 @@ import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeImpl;
-import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ColumnStrategy;
-import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlNode;
@@ -88,20 +77,13 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
-import org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteStatisticsImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
-import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
@@ -664,7 +646,7 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
}
/** */
- private BaseQueryContext baseQueryContext(Collection<IgniteSchema> schemas) {
+ protected BaseQueryContext baseQueryContext(Collection<IgniteSchema> schemas) {
SchemaPlus rootSchema = createRootSchema(false);
SchemaPlus dfltSchema = null;
@@ -686,197 +668,6 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
}
/** */
- protected static class TestTable implements IgniteCacheTable {
- /** */
- private final String name;
-
- /** */
- private final RelProtoDataType protoType;
-
- /** */
- private final Map<String, IgniteIndex> indexes = new HashMap<>();
-
- /** */
- private IgniteDistribution distribution;
-
- /** */
- private IgniteStatisticsImpl statistics;
-
- /** */
- private final CacheTableDescriptor desc;
-
- /** */
- TestTable(RelDataType type) {
- this(type, 100.0);
- }
-
- /** */
- TestTable(RelDataType type, double rowCnt) {
- this(UUID.randomUUID().toString(), type, rowCnt);
- }
-
- /** */
- TestTable(String name, RelDataType type, double rowCnt) {
- protoType = RelDataTypeImpl.proto(type);
- statistics = new IgniteStatisticsImpl(new ObjectStatisticsImpl((long)rowCnt, Collections.emptyMap()));
- this.name = name;
-
- desc = new TestTableDescriptor(this::distribution, type);
- }
-
- /**
- * Set table distribution.
- *
- * @param distribution Table distribution to set.
- * @return TestTable for chaining.
- */
- public TestTable setDistribution(IgniteDistribution distribution) {
- this.distribution = distribution;
-
- return this;
- }
-
- /**
- * Set table statistics;
- *
- * @param statistics Statistics to set.
- * @return TestTable for chaining.
- */
- public TestTable setStatistics(IgniteStatisticsImpl statistics) {
- this.statistics = statistics;
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogicalTableScan toRel(
- RelOptCluster cluster,
- RelOptTable relOptTbl,
- @Nullable List<RexNode> proj,
- @Nullable RexNode cond,
- @Nullable ImmutableBitSet requiredColumns
- ) {
- return IgniteLogicalTableScan.create(cluster, cluster.traitSet(), relOptTbl, proj, cond, requiredColumns);
- }
-
- /** {@inheritDoc} */
- @Override public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
- RelDataType rowType = protoType.apply(typeFactory);
-
- if (bitSet != null) {
- RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(typeFactory);
- for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1))
- b.add(rowType.getFieldList().get(i));
- rowType = b.build();
- }
-
- return rowType;
- }
-
- /** {@inheritDoc} */
- @Override public Statistic getStatistic() {
- return statistics;
- }
-
- /** {@inheritDoc} */
- @Override public <Row> Iterable<Row> scan(
- ExecutionContext<Row> execCtx,
- ColocationGroup group, Predicate<Row> filter,
- Function<Row, Row> transformer,
- ImmutableBitSet bitSet
- ) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public Schema.TableType getJdbcTableType() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isRolledUp(String col) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean rolledUpColumnValidInsideAgg(
- String column,
- SqlCall call,
- SqlNode parent,
- CalciteConnectionConfig config
- ) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteDistribution distribution() {
- if (distribution != null)
- return distribution;
-
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public CacheTableDescriptor descriptor() {
- return desc;
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, IgniteIndex> indexes() {
- return Collections.unmodifiableMap(indexes);
- }
-
- /** {@inheritDoc} */
- @Override public void addIndex(IgniteIndex idxTbl) {
- indexes.put(idxTbl.name(), idxTbl);
- }
-
- /** */
- public TestTable addIndex(RelCollation collation, String name) {
- indexes.put(name, new CacheIndexImpl(collation, name, null, this));
-
- return this;
- }
-
- /** */
- public TestTable addIndex(String name, int... keys) {
- addIndex(TraitUtils.createCollation(Arrays.stream(keys).boxed().collect(Collectors.toList())), name);
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteIndex getIndex(String idxName) {
- return indexes.get(idxName);
- }
-
- /** {@inheritDoc} */
- @Override public void removeIndex(String idxName) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public void ensureCacheStarted() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean isModifiable() {
- return true;
- }
-
- /** */
- public String name() {
- return name;
- }
- }
-
- /** */
static class TestTableDescriptor implements CacheTableDescriptor {
/** */
private final Supplier<IgniteDistribution> distributionSupp;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexRebuildPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexRebuildPlannerTest.java
new file mode 100644
index 0000000..da86c45
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexRebuildPlannerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Planner test for index rebuild.
+ */
+public class IndexRebuildPlannerTest extends AbstractPlannerTest {
+ /** */
+ private IgniteSchema publicSchema;
+
+ /** */
+ private TestTable tbl;
+
+ /** {@inheritDoc} */
+ @Override public void setup() {
+ super.setup();
+
+ tbl = createTable("TBL", 100, IgniteDistributions.single(), "ID", Integer.class, "VAL", String.class)
+ .addIndex("IDX", 0);
+
+ publicSchema = createSchema(tbl);
+ }
+
+ /** */
+ @Test
+ public void testIndexRebuild() throws Exception {
+ String sql = "SELECT * FROM TBL WHERE id = 0";
+
+ assertPlan(sql, publicSchema, isInstanceOf(IgniteIndexScan.class));
+
+ tbl.markIndexRebuildInProgress(true);
+
+ assertPlan(sql, publicSchema, isInstanceOf(IgniteTableScan.class));
+
+ tbl.markIndexRebuildInProgress(false);
+
+ assertPlan(sql, publicSchema, isInstanceOf(IgniteIndexScan.class));
+ }
+
+ /** */
+ @Test
+ public void testConcurrentIndexRebuildStateChange() throws Exception {
+ String sql = "SELECT * FROM TBL WHERE id = 0";
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ while (!stop.get()) {
+ tbl.markIndexRebuildInProgress(true);
+ tbl.markIndexRebuildInProgress(false);
+ }
+ });
+
+ try {
+ for (int i = 0; i < 1000; i++) {
+ IgniteRel rel = physicalPlan(sql, publicSchema);
+
+ assertTrue(rel instanceof IgniteTableScan || rel instanceof IgniteIndexScan);
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ fut.get();
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
new file mode 100644
index 0000000..742399c
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -0,0 +1,242 @@
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
+import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteStatisticsImpl;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class TestTable implements IgniteCacheTable {
+ /** */
+ private final String name;
+
+ /** */
+ private final RelProtoDataType protoType;
+
+ /** */
+ private final Map<String, IgniteIndex> indexes = new HashMap<>();
+
+ /** */
+ private IgniteDistribution distribution;
+
+ /** */
+ private IgniteStatisticsImpl statistics;
+
+ /** */
+ private final CacheTableDescriptor desc;
+
+ /** */
+ private volatile boolean idxRebuildInProgress;
+
+ /** */
+ protected TestTable(RelDataType type) {
+ this(type, 100.0);
+ }
+
+ /** */
+ protected TestTable(RelDataType type, double rowCnt) {
+ this(UUID.randomUUID().toString(), type, rowCnt);
+ }
+
+ /** */
+ protected TestTable(String name, RelDataType type, double rowCnt) {
+ protoType = RelDataTypeImpl.proto(type);
+ statistics = new IgniteStatisticsImpl(new ObjectStatisticsImpl((long)rowCnt, Collections.emptyMap()));
+ this.name = name;
+
+ desc = new AbstractPlannerTest.TestTableDescriptor(this::distribution, type);
+ }
+
+ /**
+ * Set table distribution.
+ *
+ * @param distribution Table distribution to set.
+ * @return TestTable for chaining.
+ */
+ public TestTable setDistribution(IgniteDistribution distribution) {
+ this.distribution = distribution;
+
+ return this;
+ }
+
+ /**
+ * Set table statistics;
+ *
+ * @param statistics Statistics to set.
+ * @return TestTable for chaining.
+ */
+ public TestTable setStatistics(IgniteStatisticsImpl statistics) {
+ this.statistics = statistics;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogicalTableScan toRel(
+ RelOptCluster cluster,
+ RelOptTable relOptTbl,
+ @Nullable List<RexNode> proj,
+ @Nullable RexNode cond,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ return IgniteLogicalTableScan.create(cluster, cluster.traitSet(), relOptTbl, proj, cond, requiredColumns);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
+ RelDataType rowType = protoType.apply(typeFactory);
+
+ if (bitSet != null) {
+ RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(typeFactory);
+ for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1))
+ b.add(rowType.getFieldList().get(i));
+ rowType = b.build();
+ }
+
+ return rowType;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Statistic getStatistic() {
+ return statistics;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> Iterable<Row> scan(
+ ExecutionContext<Row> execCtx,
+ ColocationGroup grp, Predicate<Row> filter,
+ Function<Row, Row> transformer,
+ ImmutableBitSet bitSet
+ ) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Schema.TableType getJdbcTableType() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRolledUp(String col) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rolledUpColumnValidInsideAgg(
+ String column,
+ SqlCall call,
+ SqlNode parent,
+ CalciteConnectionConfig cfg
+ ) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteDistribution distribution() {
+ if (distribution != null)
+ return distribution;
+
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheTableDescriptor descriptor() {
+ return desc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, IgniteIndex> indexes() {
+ return Collections.unmodifiableMap(indexes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addIndex(IgniteIndex idxTbl) {
+ indexes.put(idxTbl.name(), idxTbl);
+ }
+
+ /** */
+ public TestTable addIndex(RelCollation collation, String name) {
+ indexes.put(name, new CacheIndexImpl(collation, name, null, this));
+
+ return this;
+ }
+
+ /** */
+ public TestTable addIndex(String name, int... keys) {
+ addIndex(TraitUtils.createCollation(Arrays.stream(keys).boxed().collect(Collectors.toList())), name);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteIndex getIndex(String idxName) {
+ return indexes.get(idxName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeIndex(String idxName) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void ensureCacheStarted() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isModifiable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markIndexRebuildInProgress(boolean mark) {
+ idxRebuildInProgress = mark;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isIndexRebuildInProgress() {
+ return idxRebuildInProgress;
+ }
+
+ /** */
+ public String name() {
+ return name;
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 6338aef..8633d4d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.LogicalRelImplementorTest;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteSqlFunctionsTest;
import org.apache.ignite.internal.processors.query.calcite.sql.SqlDdlParserTest;
import org.junit.runner.RunWith;
@@ -37,6 +38,7 @@ import org.junit.runners.Suite;
QueryCheckerTest.class,
SqlDdlParserTest.class,
IgniteSqlFunctionsTest.class,
+ LogicalRelImplementorTest.class,
ScriptTestSuite.class,
})
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 73b044a..e51017c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.CalciteEr
import org.apache.ignite.internal.processors.query.calcite.integration.CorrelatesIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.HashSpoolIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.IndexDdlIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.IndexRebuildIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.IndexScanlIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.IndexSpoolIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.IntervalTest;
@@ -97,6 +98,7 @@ import org.junit.runners.Suite;
UserDefinedFunctionsIntegrationTest.class,
CorrelatesIntegrationTest.class,
SystemViewsIntegrationTest.class,
+ IndexRebuildIntegrationTest.class,
})
public class IntegrationTestSuite {
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 5de95d3..524ac14 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNes
import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedSubqueryPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.IndexRebuildPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.JoinCommutePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.JoinWithUsingPlannerTest;
@@ -67,6 +68,7 @@ import org.junit.runners.Suite;
CorrelatedSubqueryPlannerTest.class,
JoinWithUsingPlannerTest.class,
ProjectFilterScanMergePlannerTest.class,
+ IndexRebuildPlannerTest.class,
})
public class PlannerTestSuite {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
index 1334f4f..2783094 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
@@ -93,6 +93,22 @@ public interface SchemaChangeListener {
public void onIndexDropped(String schemaName, String tblName, String idxName);
/**
+ * Callback on index rebuild started for all indexes in the table.
+ *
+ * @param schemaName Schema name.
+ * @param tblName Table name.
+ */
+ public void onIndexRebuildStarted(String schemaName, String tblName);
+
+ /**
+ * Callback on index rebuild finished for all indexes in the table.
+ *
+ * @param schemaName Schema name.
+ * @param tblName Table name.
+ */
+ public void onIndexRebuildFinished(String schemaName, String tblName);
+
+ /**
* Callback on function creation.
*
* @param schemaName Schema name.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 9ff8ea6..ee539d9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2073,21 +2073,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public void markAsRebuildNeeded(GridCacheContext cctx, boolean val) {
- markIndexRebuild(cctx.name(), val);
- }
-
- /**
- * Mark tables for index rebuild, so that their indexes are not used.
- *
- * @param cacheName Cache name.
- * @param val Value.
- */
- private void markIndexRebuild(String cacheName, boolean val) {
- for (H2TableDescriptor tblDesc : schemaMgr.tablesForCache(cacheName)) {
- assert tblDesc.table() != null;
-
- tblDesc.table().markRebuildFromHashInProgress(val);
- }
+ schemaMgr.markIndexRebuild(cctx.name(), val);
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
index 528a772..0101c5f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
@@ -882,6 +882,25 @@ public class SchemaManager implements GridQuerySchemaManager {
return null;
}
+ /**
+ * Mark tables for index rebuild, so that their indexes are not used.
+ *
+ * @param cacheName Cache name.
+ * @param mark Mark/unmark flag, {@code true} if index rebuild started, {@code false} if finished.
+ */
+ public void markIndexRebuild(String cacheName, boolean mark) {
+ for (H2TableDescriptor tblDesc : tablesForCache(cacheName)) {
+ assert tblDesc.table() != null;
+
+ tblDesc.table().markRebuildFromHashInProgress(mark);
+
+ if (mark)
+ lsnr.onIndexRebuildStarted(tblDesc.schemaName(), tblDesc.tableName());
+ else
+ lsnr.onIndexRebuildFinished(tblDesc.schemaName(), tblDesc.tableName());
+ }
+ }
+
/** {@inheritDoc} */
@Override public GridQueryTypeDescriptor typeDescriptorForTable(String schemaName, String tableName) {
GridH2Table dataTable = dataTable(schemaName, tableName);
@@ -929,6 +948,12 @@ public class SchemaManager implements GridQuerySchemaManager {
@Override public void onIndexDropped(String schemaName, String tblName, String idxName) {}
/** {@inheritDoc} */
+ @Override public void onIndexRebuildStarted(String schemaName, String tblName) {}
+
+ /** {@inheritDoc} */
+ @Override public void onIndexRebuildFinished(String schemaName, String tblName) {}
+
+ /** {@inheritDoc} */
@Override public void onSqlTypeCreated(
String schemaName,
GridQueryTypeDescriptor typeDesc,
@@ -1025,6 +1050,16 @@ public class SchemaManager implements GridQuerySchemaManager {
}
/** {@inheritDoc} */
+ @Override public void onIndexRebuildStarted(String schemaName, String tblName) {
+ lsnrs.forEach(lsnr -> lsnr.onIndexRebuildStarted(schemaName, tblName));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIndexRebuildFinished(String schemaName, String tblName) {
+ lsnrs.forEach(lsnr -> lsnr.onIndexRebuildFinished(schemaName, tblName));
+ }
+
+ /** {@inheritDoc} */
@Override public void onFunctionCreated(String schemaName, String name, Method method) {
lsnrs.forEach(lsnr -> lsnr.onFunctionCreated(schemaName, name, method));
}