You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/05/18 14:52:16 UTC

[GitHub] [ignite] rkondakov opened a new pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

rkondakov opened a new pull request #7813:
URL: https://github.com/apache/ignite/pull/7813


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [x] There is a single JIRA ticket related to the pull request. 
   - [x] The web-link to the pull request is attached to the JIRA ticket.
   - [x] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [x] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-12407: Add Cluster API support to Java thin client`
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] AMashenkov closed pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
AMashenkov closed pull request #7813:
URL: https://github.com/apache/ignite/pull/7813


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] rkondakov commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
rkondakov commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r427246547



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
##########
@@ -73,7 +76,14 @@ public IgniteDistribution distribution(IgniteRel rel, RelMetadataQuery mq) {
      * See {@link IgniteMdDistribution#distribution(RelNode, RelMetadataQuery)}
      */
     public IgniteDistribution distribution(TableScan rel, RelMetadataQuery mq) {
-        return rel.getTable().unwrap(DistributedTable.class).distribution();
+        return rel.getTable().unwrap(IgniteTable.class).distribution();

Review comment:
       I think it is a redundant entity. We need to keep our API clean.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] AMashenkov commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r426693360



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridIndex.java
##########
@@ -14,18 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.ignite.internal.processors.query;
 
-package org.apache.ignite.internal.processors.query.calcite.schema;
-
-import java.util.List;
-import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.util.lang.GridCursor;
 
 /**
- *
+ * Index interface.
  */
-public interface SortedTable {
-    /**
-     * @return The table collations.
-     */
-    List<RelCollation> collations();
+public interface GridIndex<T> {
+
+    GridCursor<T> find(T lower, T upper, BPlusTree.TreeRowClosure<T, T> filterClosure);

Review comment:
       Javadoc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] rkondakov commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
rkondakov commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r428320786



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -552,31 +580,73 @@ private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws Valida
         sqlNode = planner.validate(sqlNode);
 
         // Convert to Relational operators graph
-        RelNode rel = planner.convert(sqlNode);
+        IgniteRel igniteRel = optimize(sqlNode, planner);
+
+        // Split query plan to query fragments.
+        List<Fragment> fragments = new Splitter().go(igniteRel);
+
+        return new MultiStepDmlPlan(fragments, fieldsMetadata(ctx, igniteRel.getRowType(), null));
+    }
+
+    /** */
+    private IgniteRel optimize(SqlNode sqlNode, IgnitePlanner planner) {
+        // Convert to Relational operators graph
+        RelRoot root = planner.rel(sqlNode);

Review comment:
       I need a root node collation later, so I need a root node itself.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] AMashenkov commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r426693537



##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
##########
@@ -380,6 +380,23 @@ public boolean rebuildRequired() {
         }
     }
 
+    @Override public GridCursor<H2Row> find(

Review comment:
       Javadoc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] AMashenkov commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r426693150



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
##########
@@ -55,4 +58,17 @@
      * @param cacheInfo Cache info.
      */
     void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo<?,?> cacheInfo);
+
+    /**
+     * Callback on index creation.
+     */
+    void onIndexCreate(String schemaName, String tblName, String idxName, GridQueryIndexDescriptor idxDesc,

Review comment:
       javadoc

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
##########
@@ -55,4 +58,17 @@
      * @param cacheInfo Cache info.
      */
     void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo<?,?> cacheInfo);
+
+    /**
+     * Callback on index creation.
+     */
+    void onIndexCreate(String schemaName, String tblName, String idxName, GridQueryIndexDescriptor idxDesc,
+        GridIndex idx);
+
+    /**
+     * Callback on index drop.
+     */
+    void onIndexDrop(String schemaName, String tblName, String idxName);

Review comment:
       javadoc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] rkondakov commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
rkondakov commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r428520642



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Sort node.
+ */
+public class SortNode extends AbstractNode<Object[]> implements SingleNode<Object[]>, Downstream<Object[]> {
+    /** How many rows are requested by downstream. */
+    private int requested;
+
+    /** How many rows are we waiting for from the upstream. {@code -1} means end of stream. */
+    private int waiting;
+
+    /**  */
+    private boolean inLoop;
+
+    /** Rows comparator. */
+    private final Comparator<Object[]> comparator;
+
+    /** Rows buffer. */
+    private final List<Object[]> rows = new ArrayList<>();
+
+    /** Index of next row which buffer will return. {@code -1} means buffer is not sorted yet. */
+    private int curIdx = -1;
+
+    /**
+     * @param ctx Execution context.
+     * @param collation Sort collation.
+     */
+    public SortNode(ExecutionContext ctx, RelCollation collation) {
+        super(ctx);
+        this.comparator = Commons.comparator(collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Object[]> requestDownstream(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCount) {
+        checkThread();
+
+        assert !F.isEmpty(sources) && sources.size() == 1;
+        assert rowsCount > 0 && requested == 0;
+
+        requested = rowsCount;
+
+        if (waiting == -1 && !inLoop)
+            context().execute(this::flushFromBuffer);
+        else if (waiting == 0)
+            F.first(sources).request(waiting = IN_BUFFER_SIZE);
+        else
+            throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void push(Object[] row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waiting > 0;
+
+        waiting--;
+
+        try {
+            rows.add(row);
+
+            if (waiting == 0)
+                F.first(sources).request(waiting = IN_BUFFER_SIZE);
+        }
+        catch (Exception e) {
+            downstream.onError(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void end() {
+        checkThread();
+
+        assert downstream != null;
+        assert waiting > 0;
+
+        waiting = -1;
+
+        try {
+            flushFromBuffer();
+        }
+        catch (Exception e) {
+            downstream.onError(e);
+        }
+    }
+
+    @Override public void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+    }
+
+    /** */
+    private void flushFromBuffer() {
+        assert waiting == -1;
+
+        if (curIdx == -1 && comparator != null)
+            rows.sort(comparator);

Review comment:
       Though, when `LIMIT` operator is implemented, it will be no needed to sort the entire result set. In this cases `PriorityQueue` is better choice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] gvvinblade commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
gvvinblade commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r426721220



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Sort node.
+ */
+public class SortNode extends AbstractNode<Object[]> implements SingleNode<Object[]>, Downstream<Object[]> {
+    /** How many rows are requested by downstream. */
+    private int requested;
+
+    /** How many rows are we waiting for from the upstream. {@code -1} means end of stream. */
+    private int waiting;
+
+    /**  */
+    private boolean inLoop;
+
+    /** Rows comparator. */
+    private final Comparator<Object[]> comparator;
+
+    /** Rows buffer. */
+    private final List<Object[]> rows = new ArrayList<>();
+
+    /** Index of next row which buffer will return. {@code -1} means buffer is not sorted yet. */
+    private int curIdx = -1;
+
+    /**
+     * @param ctx Execution context.
+     * @param collation Sort collation.
+     */
+    public SortNode(ExecutionContext ctx, RelCollation collation) {
+        super(ctx);
+        this.comparator = Commons.comparator(collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Object[]> requestDownstream(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCount) {
+        checkThread();
+
+        assert !F.isEmpty(sources) && sources.size() == 1;
+        assert rowsCount > 0 && requested == 0;
+
+        requested = rowsCount;
+
+        if (waiting == -1 && !inLoop)
+            context().execute(this::flushFromBuffer);
+        else if (waiting == 0)
+            F.first(sources).request(waiting = IN_BUFFER_SIZE);
+        else
+            throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void push(Object[] row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waiting > 0;
+
+        waiting--;
+
+        try {
+            rows.add(row);
+
+            if (waiting == 0)
+                F.first(sources).request(waiting = IN_BUFFER_SIZE);
+        }
+        catch (Exception e) {
+            downstream.onError(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void end() {
+        checkThread();
+
+        assert downstream != null;
+        assert waiting > 0;
+
+        waiting = -1;
+
+        try {
+            flushFromBuffer();
+        }
+        catch (Exception e) {
+            downstream.onError(e);
+        }
+    }
+
+    @Override public void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+    }
+
+    /** */
+    private void flushFromBuffer() {
+        assert waiting == -1;
+
+        if (curIdx == -1 && comparator != null)
+            rows.sort(comparator);

Review comment:
       Isn't heap sort better here? Lets use PriorityQueue instead of List as a buffer

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
##########
@@ -127,6 +127,18 @@ public RexBuilder rexBuilder() {
         return new PredicateImpl<>(ctx, scalar(filter, rowType));
     }
 
+    /**
+     * Creates a Filter predicate.
+     * @param ctx Execution context, holds a planner context, query and its parameters,
+     *             execution specific variables (like queryId, current user, session, etc).
+     * @param filters Filters expression.
+     * @param rowType Input row type.
+     * @return Filter predicate.
+     */
+    public <T> Predicate<T> predicate(ExecutionContext ctx, List<RexNode> filters, RelDataType rowType) {

Review comment:
       I see no usages of the method

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
##########
@@ -160,6 +172,24 @@ public RexBuilder rexBuilder() {
         return () -> new ValuesIterator<>(out, rowLen);
     }
 
+    /**
+     * Creates objects row from RexNodes.
+     *
+     * @param ctx Execution context, holds a planner context, query and its parameters,
+     *             execution specific variables (like queryId, current user, session, etc).
+     * @param values Values.
+     * @return Values relational node rows source.
+     */
+    public Object[] convertToObjects(ExecutionContext ctx, List<RexNode> values, RelDataType rowType) {

Review comment:
       Since there was a plan to generalize the factory and whole execution flow, it would be better if the factory has as few references to actual row type as possible.
   `public T asRow(...)` is better from my point of view

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
##########
@@ -73,7 +76,14 @@ public IgniteDistribution distribution(IgniteRel rel, RelMetadataQuery mq) {
      * See {@link IgniteMdDistribution#distribution(RelNode, RelMetadataQuery)}
      */
     public IgniteDistribution distribution(TableScan rel, RelMetadataQuery mq) {
-        return rel.getTable().unwrap(DistributedTable.class).distribution();
+        return rel.getTable().unwrap(IgniteTable.class).distribution();

Review comment:
       DistributedTable was introduced to use mocks easily in tests, I would prefer to return it back

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
##########
@@ -17,33 +17,351 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.calcite.rex.RexUtil.removeCast;
+import static org.apache.calcite.sql.SqlKind.EQUALS;
+import static org.apache.calcite.sql.SqlKind.GREATER_THAN;
+import static org.apache.calcite.sql.SqlKind.GREATER_THAN_OR_EQUAL;
+import static org.apache.calcite.sql.SqlKind.LESS_THAN;
+import static org.apache.calcite.sql.SqlKind.LESS_THAN_OR_EQUAL;
+import static org.apache.calcite.sql.SqlKind.OR;
 
 /**
  * Relational operator that returns the contents of a table.
  */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
 public class IgniteTableScan extends TableScan implements IgniteRel {

Review comment:
       let's have two classes - for sorted and unsorted tables, of course it increases a bit almost all visitors, but makes code clearer

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
##########
@@ -191,12 +250,14 @@ public LogicalRelImplementor(ExecutionContext ctx, PartitionService partitionSer
 
     /** {@inheritDoc} */
     @Override public Node<Object[]> visit(IgniteReceiver rel) {
-        Inbox<?> inbox = mailboxRegistry.register(
-            new Inbox<>(ctx, exchangeService, mailboxRegistry, rel.exchangeId(), rel.sourceFragmentId()));
+        Inbox inbox = mailboxRegistry.register(
+            new Inbox(ctx, exchangeService, mailboxRegistry, rel.exchangeId(), rel.sourceFragmentId()));
+
+        RelCollation collation = F.isEmpty(rel.collations()) ? null : rel.collations().get(0);
 
         // here may be an already created (to consume rows from remote nodes) inbox
         // without proper context, we need to init it with a right one.
-        inbox.init(ctx, ctx.remoteSources(rel.exchangeId()), expressionFactory.comparator(ctx, rel.collations(), rel.getRowType()));
+        inbox.init(ctx, ctx.remoteSources(rel.exchangeId()), Commons.comparator(collation));

Review comment:
       Move `Commons.comparator(..)` code into `ExpressionFactory.comparator(..)` method

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
##########
@@ -37,7 +37,7 @@
 /**
  * A part of exchange.
  */
-public class Inbox<T> extends AbstractNode<T> implements SingleNode<T>, AutoCloseable {
+public class Inbox extends AbstractNode<Object[]> implements SingleNode<Object[]>, AutoCloseable {

Review comment:
       It has to be generic because Object[] isn't a final row type. Actually all execution nodes should not have strong row types (see https://issues.apache.org/jira/browse/IGNITE-12900)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/RuleUtils.java
##########
@@ -48,7 +47,7 @@
 public class RuleUtils {
     /** */
     public static RelOptRuleOperand traitPropagationOperand(Class<? extends RelNode> clazz) {
-        return operand(clazz, IgniteDistributions.any(), some(operand(RelSubset.class, any())));
+        return operand(clazz, null, some(operand(RelSubset.class, any())));

Review comment:
       It was just a way to decrease possible false matchings (because usually only 'ANY' distributed relations are proto- ones)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectTraitsPropagationRule.java
##########
@@ -50,6 +55,13 @@ public ProjectTraitsPropagationRule() {
         RelTraitSet traits = rel.getTraitSet()
             .replace(IgniteDistributions.project(mq, input, rel.getProjects()));
 
+        if (call.getPlanner().getRelTraitDefs().contains(RelCollationTraitDef.INSTANCE)) {

Review comment:
       Since Collation is a required trait, a condition is useless here.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterTraitsPropagationRule.java
##########
@@ -46,7 +47,9 @@ public FilterTraitsPropagationRule() {
         RelMetadataQuery mq = cluster.getMetadataQuery();
 
         RelTraitSet traits = rel.getTraitSet()
-            .replace(IgniteMdDistribution._distribution(input, mq));
+            .replace(IgniteMdDistribution._distribution(input, mq))
+            .replaceIf(RelCollationTraitDef.INSTANCE,

Review comment:
       I think Collation trait def is required for us, non-conditional replace is better here

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
##########
@@ -373,6 +376,49 @@ public static String explain(RelNode rel) {
         }
     }
 
+
+    /**
+     * Makes comparator from collation.
+     *
+     * @param collation Collation.
+     * @return Comparator.
+     */
+    public static Comparator<Object[]> comparator(RelCollation collation) {

Review comment:
       Move it into ExpressionsFactory

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
##########
@@ -97,18 +116,78 @@ public String name() {
         return statistic;
     }
 
+    /** */
+    public TableDescriptor descriptor() {
+        return desc;
+    }
+
     /** {@inheritDoc} */
     @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
         RelOptCluster cluster = context.getCluster();
+
+        return toRel(cluster, relOptTable, PK_INDEX_NAME);
+    }
+
+    /**
+     * Converts table into relational expression.
+     *
+     * @param cluster Custer.
+     * @param relOptTable Table.
+     * @return Table relational expression.
+     */
+    public IgniteTableScan toRel(RelOptCluster cluster, RelOptTable relOptTable, String idxName) {
         RelTraitSet traitSet = cluster.traitSetOf(IgniteConvention.INSTANCE)
-            .replaceIfs(RelCollationTraitDef.INSTANCE, this::collations)
             .replaceIf(DistributionTraitDef.INSTANCE, this::distribution);
 
-        return new IgniteTableScan(cluster, traitSet, relOptTable);
+        IgniteIndex idx = indexes.get(idxName);
+
+        if (idx == null)
+            return null;
+
+        traitSet = traitSet.replaceIf(RelCollationTraitDef.INSTANCE, idx::collation);
+
+        return new IgniteTableScan(cluster, traitSet, relOptTable, idxName,  null);
     }
 
-    /** {@inheritDoc} */
-    @Override public NodesMapping mapping(PlanningContext ctx) {
+    /**
+     * @return Indexes for the current table.
+     */
+    public Map<String, IgniteIndex> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Adds index to table.
+     * @param idxTbl Index table.
+     */
+    public void addIndex(IgniteIndex idxTbl) {
+        indexes.put(idxTbl.name(), idxTbl);
+    }
+
+    /**
+     * @param idxName Index name.
+     * @return Index.
+     */
+    public IgniteIndex getIndex(String idxName) {
+        return indexes.get(idxName);
+    }
+
+    /**
+     * @return Column descriptors.
+     */
+    public ColumnDescriptor[] columnDescriptors() {

Review comment:
       this is a part of table descriptor, I don't think we we need this delegate methods.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
##########
@@ -31,12 +31,13 @@
 import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
 import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * A part of exchange.
  */
-public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Downstream<T>, AutoCloseable {
+public class Outbox extends AbstractNode<Object[]> implements SingleNode<Object[]>, Downstream<Object[]>, AutoCloseable {

Review comment:
       It has to be generic too.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
##########
@@ -97,18 +116,78 @@ public String name() {
         return statistic;
     }
 
+    /** */
+    public TableDescriptor descriptor() {
+        return desc;
+    }
+
     /** {@inheritDoc} */
     @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
         RelOptCluster cluster = context.getCluster();
+
+        return toRel(cluster, relOptTable, PK_INDEX_NAME);
+    }
+
+    /**
+     * Converts table into relational expression.
+     *
+     * @param cluster Custer.
+     * @param relOptTable Table.
+     * @return Table relational expression.
+     */
+    public IgniteTableScan toRel(RelOptCluster cluster, RelOptTable relOptTable, String idxName) {
         RelTraitSet traitSet = cluster.traitSetOf(IgniteConvention.INSTANCE)
-            .replaceIfs(RelCollationTraitDef.INSTANCE, this::collations)
             .replaceIf(DistributionTraitDef.INSTANCE, this::distribution);
 
-        return new IgniteTableScan(cluster, traitSet, relOptTable);
+        IgniteIndex idx = indexes.get(idxName);
+
+        if (idx == null)
+            return null;
+
+        traitSet = traitSet.replaceIf(RelCollationTraitDef.INSTANCE, idx::collation);
+
+        return new IgniteTableScan(cluster, traitSet, relOptTable, idxName,  null);
     }
 
-    /** {@inheritDoc} */
-    @Override public NodesMapping mapping(PlanningContext ctx) {
+    /**
+     * @return Indexes for the current table.
+     */
+    public Map<String, IgniteIndex> indexes() {

Review comment:
       Lets introduce another interface like IndexedTable and put related methods into it, in future it will help us to use mocks in tests, another point is that we are going to rethink how we manage services and components and put everything into an IoC container. Right interfaces and abstractions are necessary.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -552,31 +580,73 @@ private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws Valida
         sqlNode = planner.validate(sqlNode);
 
         // Convert to Relational operators graph
-        RelNode rel = planner.convert(sqlNode);
+        IgniteRel igniteRel = optimize(sqlNode, planner);
+
+        // Split query plan to query fragments.
+        List<Fragment> fragments = new Splitter().go(igniteRel);
+
+        return new MultiStepDmlPlan(fragments, fieldsMetadata(ctx, igniteRel.getRowType(), null));
+    }
+
+    /** */
+    private IgniteRel optimize(SqlNode sqlNode, IgnitePlanner planner) {
+        // Convert to Relational operators graph
+        RelRoot root = planner.rel(sqlNode);

Review comment:
       Use `planner.convert(...)` instead (or `RelNode rel = root.project()`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] rkondakov commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
rkondakov commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r427246143



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Sort node.
+ */
+public class SortNode extends AbstractNode<Object[]> implements SingleNode<Object[]>, Downstream<Object[]> {
+    /** How many rows are requested by downstream. */
+    private int requested;
+
+    /** How many rows are we waiting for from the upstream. {@code -1} means end of stream. */
+    private int waiting;
+
+    /**  */
+    private boolean inLoop;
+
+    /** Rows comparator. */
+    private final Comparator<Object[]> comparator;
+
+    /** Rows buffer. */
+    private final List<Object[]> rows = new ArrayList<>();
+
+    /** Index of next row which buffer will return. {@code -1} means buffer is not sorted yet. */
+    private int curIdx = -1;
+
+    /**
+     * @param ctx Execution context.
+     * @param collation Sort collation.
+     */
+    public SortNode(ExecutionContext ctx, RelCollation collation) {
+        super(ctx);
+        this.comparator = Commons.comparator(collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Object[]> requestDownstream(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCount) {
+        checkThread();
+
+        assert !F.isEmpty(sources) && sources.size() == 1;
+        assert rowsCount > 0 && requested == 0;
+
+        requested = rowsCount;
+
+        if (waiting == -1 && !inLoop)
+            context().execute(this::flushFromBuffer);
+        else if (waiting == 0)
+            F.first(sources).request(waiting = IN_BUFFER_SIZE);
+        else
+            throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void push(Object[] row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waiting > 0;
+
+        waiting--;
+
+        try {
+            rows.add(row);
+
+            if (waiting == 0)
+                F.first(sources).request(waiting = IN_BUFFER_SIZE);
+        }
+        catch (Exception e) {
+            downstream.onError(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void end() {
+        checkThread();
+
+        assert downstream != null;
+        assert waiting > 0;
+
+        waiting = -1;
+
+        try {
+            flushFromBuffer();
+        }
+        catch (Exception e) {
+            downstream.onError(e);
+        }
+    }
+
+    @Override public void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+    }
+
+    /** */
+    private void flushFromBuffer() {
+        assert waiting == -1;
+
+        if (curIdx == -1 && comparator != null)
+            rows.sort(comparator);

Review comment:
       I thought about it, but it looks like in practice `List.sort` is faster [https://stackoverflow.com/a/12964351/6392181](https://stackoverflow.com/a/12964351/6392181)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] AMashenkov commented on a change in pull request #7813: IGNITE-12715: Support secondary indexes in Calcite engine.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #7813:
URL: https://github.com/apache/ignite/pull/7813#discussion_r426687629



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -342,10 +354,32 @@ public ClosableIteratorsHolder iteratorsHolder() {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("TypeMayBeWeakened")
     @Override public List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params) {
         PlanningContext pctx = createContext(ctx, schema, query, params);
 
-        return Commons.transform(prepare(pctx), p -> executeSingle(UUID.randomUUID(), pctx, p));
+        List<QueryPlan> qryPlans = prepareQueryPlan(pctx);
+
+        return executePlans(qryPlans, pctx);
+    }
+
+    /**
+     * Executes prepared plans.
+     * @param qryPlans Query plans.
+     * @param pctx Query context.
+     * @return List of query result cursors.
+     */
+    @NotNull public List<FieldsQueryCursor<List<?>>> executePlans(Collection<QueryPlan> qryPlans, PlanningContext pctx) {
+        List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryPlans.size());
+
+        for (QueryPlan plan : qryPlans) {
+            UUID qryId = UUID.randomUUID();
+
+            FieldsQueryCursor<List<?>> cur = executePlan(qryId, pctx, plan);
+
+            cursors.add(cur);
+        }
+        return cursors;

Review comment:
       empty line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org