You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/05/19 11:38:49 UTC

[GitHub] [ignite] gvvinblade commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

gvvinblade commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r426721220



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Sort node.
+ */
+public class SortNode extends AbstractNode<Object[]> implements SingleNode<Object[]>, Downstream<Object[]> {
+    /** How many rows are requested by downstream. */
+    private int requested;
+
+    /** How many rows are we waiting for from the upstream. {@code -1} means end of stream. */
+    private int waiting;
+
+    /**  */
+    private boolean inLoop;
+
+    /** Rows comparator. */
+    private final Comparator<Object[]> comparator;
+
+    /** Rows buffer. */
+    private final List<Object[]> rows = new ArrayList<>();
+
+    /** Index of next row which buffer will return. {@code -1} means buffer is not sorted yet. */
+    private int curIdx = -1;
+
+    /**
+     * @param ctx Execution context.
+     * @param collation Sort collation.
+     */
+    public SortNode(ExecutionContext ctx, RelCollation collation) {
+        super(ctx);
+        this.comparator = Commons.comparator(collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Object[]> requestDownstream(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCount) {
+        checkThread();
+
+        assert !F.isEmpty(sources) && sources.size() == 1;
+        assert rowsCount > 0 && requested == 0;
+
+        requested = rowsCount;
+
+        if (waiting == -1 && !inLoop)
+            context().execute(this::flushFromBuffer);
+        else if (waiting == 0)
+            F.first(sources).request(waiting = IN_BUFFER_SIZE);
+        else
+            throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void push(Object[] row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waiting > 0;
+
+        waiting--;
+
+        try {
+            rows.add(row);
+
+            if (waiting == 0)
+                F.first(sources).request(waiting = IN_BUFFER_SIZE);
+        }
+        catch (Exception e) {
+            downstream.onError(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void end() {
+        checkThread();
+
+        assert downstream != null;
+        assert waiting > 0;
+
+        waiting = -1;
+
+        try {
+            flushFromBuffer();
+        }
+        catch (Exception e) {
+            downstream.onError(e);
+        }
+    }
+
+    @Override public void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+    }
+
+    /** */
+    private void flushFromBuffer() {
+        assert waiting == -1;
+
+        if (curIdx == -1 && comparator != null)
+            rows.sort(comparator);

Review comment:
       Isn't heap sort better here? Lets use PriorityQueue instead of List as a buffer

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
##########
@@ -127,6 +127,18 @@ public RexBuilder rexBuilder() {
         return new PredicateImpl<>(ctx, scalar(filter, rowType));
     }
 
+    /**
+     * Creates a Filter predicate.
+     * @param ctx Execution context, holds a planner context, query and its parameters,
+     *             execution specific variables (like queryId, current user, session, etc).
+     * @param filters Filters expression.
+     * @param rowType Input row type.
+     * @return Filter predicate.
+     */
+    public <T> Predicate<T> predicate(ExecutionContext ctx, List<RexNode> filters, RelDataType rowType) {

Review comment:
       I see no usages of the method

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
##########
@@ -160,6 +172,24 @@ public RexBuilder rexBuilder() {
         return () -> new ValuesIterator<>(out, rowLen);
     }
 
+    /**
+     * Creates objects row from RexNodes.
+     *
+     * @param ctx Execution context, holds a planner context, query and its parameters,
+     *             execution specific variables (like queryId, current user, session, etc).
+     * @param values Values.
+     * @return Values relational node rows source.
+     */
+    public Object[] convertToObjects(ExecutionContext ctx, List<RexNode> values, RelDataType rowType) {

Review comment:
       Since there was a plan to generalize the factory and whole execution flow, it would be better if the factory has as few references to actual row type as possible.
   `public T asRow(...)` is better from my point of view

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
##########
@@ -73,7 +76,14 @@ public IgniteDistribution distribution(IgniteRel rel, RelMetadataQuery mq) {
      * See {@link IgniteMdDistribution#distribution(RelNode, RelMetadataQuery)}
      */
     public IgniteDistribution distribution(TableScan rel, RelMetadataQuery mq) {
-        return rel.getTable().unwrap(DistributedTable.class).distribution();
+        return rel.getTable().unwrap(IgniteTable.class).distribution();

Review comment:
       DistributedTable was introduced to use mocks easily in tests, I would prefer to return it back

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
##########
@@ -17,33 +17,351 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.calcite.rex.RexUtil.removeCast;
+import static org.apache.calcite.sql.SqlKind.EQUALS;
+import static org.apache.calcite.sql.SqlKind.GREATER_THAN;
+import static org.apache.calcite.sql.SqlKind.GREATER_THAN_OR_EQUAL;
+import static org.apache.calcite.sql.SqlKind.LESS_THAN;
+import static org.apache.calcite.sql.SqlKind.LESS_THAN_OR_EQUAL;
+import static org.apache.calcite.sql.SqlKind.OR;
 
 /**
  * Relational operator that returns the contents of a table.
  */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
 public class IgniteTableScan extends TableScan implements IgniteRel {

Review comment:
       let's have two classes - for sorted and unsorted tables, of course it increases a bit almost all visitors, but makes code clearer

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
##########
@@ -191,12 +250,14 @@ public LogicalRelImplementor(ExecutionContext ctx, PartitionService partitionSer
 
     /** {@inheritDoc} */
     @Override public Node<Object[]> visit(IgniteReceiver rel) {
-        Inbox<?> inbox = mailboxRegistry.register(
-            new Inbox<>(ctx, exchangeService, mailboxRegistry, rel.exchangeId(), rel.sourceFragmentId()));
+        Inbox inbox = mailboxRegistry.register(
+            new Inbox(ctx, exchangeService, mailboxRegistry, rel.exchangeId(), rel.sourceFragmentId()));
+
+        RelCollation collation = F.isEmpty(rel.collations()) ? null : rel.collations().get(0);
 
         // here may be an already created (to consume rows from remote nodes) inbox
         // without proper context, we need to init it with a right one.
-        inbox.init(ctx, ctx.remoteSources(rel.exchangeId()), expressionFactory.comparator(ctx, rel.collations(), rel.getRowType()));
+        inbox.init(ctx, ctx.remoteSources(rel.exchangeId()), Commons.comparator(collation));

Review comment:
       Move `Commons.comparator(..)` code into `ExpressionFactory.comparator(..)` method

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
##########
@@ -37,7 +37,7 @@
 /**
  * A part of exchange.
  */
-public class Inbox<T> extends AbstractNode<T> implements SingleNode<T>, AutoCloseable {
+public class Inbox extends AbstractNode<Object[]> implements SingleNode<Object[]>, AutoCloseable {

Review comment:
       It has to be generic because Object[] isn't a final row type. Actually all execution nodes should not have strong row types (see https://issues.apache.org/jira/browse/IGNITE-12900)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/RuleUtils.java
##########
@@ -48,7 +47,7 @@
 public class RuleUtils {
     /** */
     public static RelOptRuleOperand traitPropagationOperand(Class<? extends RelNode> clazz) {
-        return operand(clazz, IgniteDistributions.any(), some(operand(RelSubset.class, any())));
+        return operand(clazz, null, some(operand(RelSubset.class, any())));

Review comment:
       It was just a way to decrease possible false matchings (because usually only 'ANY' distributed relations are proto- ones)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectTraitsPropagationRule.java
##########
@@ -50,6 +55,13 @@ public ProjectTraitsPropagationRule() {
         RelTraitSet traits = rel.getTraitSet()
             .replace(IgniteDistributions.project(mq, input, rel.getProjects()));
 
+        if (call.getPlanner().getRelTraitDefs().contains(RelCollationTraitDef.INSTANCE)) {

Review comment:
       Since Collation is a required trait, a condition is useless here.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterTraitsPropagationRule.java
##########
@@ -46,7 +47,9 @@ public FilterTraitsPropagationRule() {
         RelMetadataQuery mq = cluster.getMetadataQuery();
 
         RelTraitSet traits = rel.getTraitSet()
-            .replace(IgniteMdDistribution._distribution(input, mq));
+            .replace(IgniteMdDistribution._distribution(input, mq))
+            .replaceIf(RelCollationTraitDef.INSTANCE,

Review comment:
       I think Collation trait def is required for us, non-conditional replace is better here

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
##########
@@ -373,6 +376,49 @@ public static String explain(RelNode rel) {
         }
     }
 
+
+    /**
+     * Makes comparator from collation.
+     *
+     * @param collation Collation.
+     * @return Comparator.
+     */
+    public static Comparator<Object[]> comparator(RelCollation collation) {

Review comment:
       Move it into ExpressionsFactory

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
##########
@@ -97,18 +116,78 @@ public String name() {
         return statistic;
     }
 
+    /** */
+    public TableDescriptor descriptor() {
+        return desc;
+    }
+
     /** {@inheritDoc} */
     @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
         RelOptCluster cluster = context.getCluster();
+
+        return toRel(cluster, relOptTable, PK_INDEX_NAME);
+    }
+
+    /**
+     * Converts table into relational expression.
+     *
+     * @param cluster Custer.
+     * @param relOptTable Table.
+     * @return Table relational expression.
+     */
+    public IgniteTableScan toRel(RelOptCluster cluster, RelOptTable relOptTable, String idxName) {
         RelTraitSet traitSet = cluster.traitSetOf(IgniteConvention.INSTANCE)
-            .replaceIfs(RelCollationTraitDef.INSTANCE, this::collations)
             .replaceIf(DistributionTraitDef.INSTANCE, this::distribution);
 
-        return new IgniteTableScan(cluster, traitSet, relOptTable);
+        IgniteIndex idx = indexes.get(idxName);
+
+        if (idx == null)
+            return null;
+
+        traitSet = traitSet.replaceIf(RelCollationTraitDef.INSTANCE, idx::collation);
+
+        return new IgniteTableScan(cluster, traitSet, relOptTable, idxName,  null);
     }
 
-    /** {@inheritDoc} */
-    @Override public NodesMapping mapping(PlanningContext ctx) {
+    /**
+     * @return Indexes for the current table.
+     */
+    public Map<String, IgniteIndex> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Adds index to table.
+     * @param idxTbl Index table.
+     */
+    public void addIndex(IgniteIndex idxTbl) {
+        indexes.put(idxTbl.name(), idxTbl);
+    }
+
+    /**
+     * @param idxName Index name.
+     * @return Index.
+     */
+    public IgniteIndex getIndex(String idxName) {
+        return indexes.get(idxName);
+    }
+
+    /**
+     * @return Column descriptors.
+     */
+    public ColumnDescriptor[] columnDescriptors() {

Review comment:
       this is a part of table descriptor, I don't think we we need this delegate methods.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
##########
@@ -31,12 +31,13 @@
 import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
 import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * A part of exchange.
  */
-public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Downstream<T>, AutoCloseable {
+public class Outbox extends AbstractNode<Object[]> implements SingleNode<Object[]>, Downstream<Object[]>, AutoCloseable {

Review comment:
       It has to be generic too.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
##########
@@ -97,18 +116,78 @@ public String name() {
         return statistic;
     }
 
+    /** */
+    public TableDescriptor descriptor() {
+        return desc;
+    }
+
     /** {@inheritDoc} */
     @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
         RelOptCluster cluster = context.getCluster();
+
+        return toRel(cluster, relOptTable, PK_INDEX_NAME);
+    }
+
+    /**
+     * Converts table into relational expression.
+     *
+     * @param cluster Custer.
+     * @param relOptTable Table.
+     * @return Table relational expression.
+     */
+    public IgniteTableScan toRel(RelOptCluster cluster, RelOptTable relOptTable, String idxName) {
         RelTraitSet traitSet = cluster.traitSetOf(IgniteConvention.INSTANCE)
-            .replaceIfs(RelCollationTraitDef.INSTANCE, this::collations)
             .replaceIf(DistributionTraitDef.INSTANCE, this::distribution);
 
-        return new IgniteTableScan(cluster, traitSet, relOptTable);
+        IgniteIndex idx = indexes.get(idxName);
+
+        if (idx == null)
+            return null;
+
+        traitSet = traitSet.replaceIf(RelCollationTraitDef.INSTANCE, idx::collation);
+
+        return new IgniteTableScan(cluster, traitSet, relOptTable, idxName,  null);
     }
 
-    /** {@inheritDoc} */
-    @Override public NodesMapping mapping(PlanningContext ctx) {
+    /**
+     * @return Indexes for the current table.
+     */
+    public Map<String, IgniteIndex> indexes() {

Review comment:
       Lets introduce another interface like IndexedTable and put related methods into it, in future it will help us to use mocks in tests, another point is that we are going to rethink how we manage services and components and put everything into an IoC container. Right interfaces and abstractions are necessary.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -552,31 +580,73 @@ private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws Valida
         sqlNode = planner.validate(sqlNode);
 
         // Convert to Relational operators graph
-        RelNode rel = planner.convert(sqlNode);
+        IgniteRel igniteRel = optimize(sqlNode, planner);
+
+        // Split query plan to query fragments.
+        List<Fragment> fragments = new Splitter().go(igniteRel);
+
+        return new MultiStepDmlPlan(fragments, fieldsMetadata(ctx, igniteRel.getRowType(), null));
+    }
+
+    /** */
+    private IgniteRel optimize(SqlNode sqlNode, IgnitePlanner planner) {
+        // Convert to Relational operators graph
+        RelRoot root = planner.rel(sqlNode);

Review comment:
       Use `planner.convert(...)` instead (or `RelNode rel = root.project()`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org