You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/06/02 15:55:55 UTC

[GitHub] [ignite] alex-plekhanov commented on a diff in pull request #10059: IGNITE-15550 SQL Calcite: implement ARRAY, ARRAY_AGG, ARRAY_CONCAT_AGG and MAP support

alex-plekhanov commented on code in PR #10059:
URL: https://github.com/apache/ignite/pull/10059#discussion_r886929514


##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/fun/IgniteStdSqlOperatorTable.java:
##########
@@ -218,8 +220,8 @@ public IgniteStdSqlOperatorTable() {
         register(SqlStdOperatorTable.IS_NOT_EMPTY);
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-15550

Review Comment:
   Remove TODO



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/fun/IgniteStdSqlOperatorTable.java:
##########
@@ -86,6 +86,8 @@ public IgniteStdSqlOperatorTable() {
         register(SqlLibraryOperators.GROUP_CONCAT);
         register(SqlLibraryOperators.STRING_AGG);
         register(SqlStdOperatorTable.LISTAGG);
+        register(SqlLibraryOperators.ARRAY_AGG);
+        register(SqlLibraryOperators.ARRAY_CONCAT_AGG);

Review Comment:
   Let's add these functions to documentation.



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/logical/SqlScriptRunner.java:
##########
@@ -174,10 +174,24 @@ private List<List<?>> sql(String sql) {
     private static String toString(Object res) {
         if (res instanceof byte[])
             return ByteString.toString((byte[])res, 16);
+        else if (res instanceof Map)
+            return mapToString((Map<? extends Comparable, ?>)res);
         else
             return String.valueOf(res);
     }
 
+    /** */
+    private static String mapToString(Map<? extends Comparable, ?> map) {
+        if (map == null)
+            return "null";
+
+        List<String> entries = map.entrySet().stream()
+            .sorted((e1, e2) -> e1.getKey().compareTo(e2.getKey())).map(e -> e.getKey() + ":" + e.getValue())
+            .collect(Collectors.toList());
+
+        return "{" + String.join(", ", entries) + "}";
+    }

Review Comment:
   ```suggestion
       /** */
       private static String mapToString(Map<?, ?> map) {
           return new TreeMap<>(map).toString();
       }
   ```



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CollectNode.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import com.google.common.collect.Iterables;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public class CollectNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
+    /** */
+    private final Collector<Row> collector;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /**
+     * @param ctx Execution context.
+     * @param rowType Output row type.
+     */
+    public CollectNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType
+    ) {
+        super(ctx, rowType);
+
+        collector = createCollector(ctx, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        collector.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0 && requested == 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (waiting == 0)
+            source().request(waiting = IN_BUFFER_SIZE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void push(Row row) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        collector.push(row);
+
+        if (waiting == 0)
+            source().request(waiting = IN_BUFFER_SIZE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void end() throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting = -1;
+
+        if (isClosed())
+            return;
+
+        if (requested > 0) {
+            requested = 0;
+
+            downstream().push(collector.get());
+            downstream().end();
+        }
+    }
+
+    /** */
+    private static <Row> Collector<Row> createCollector(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType
+    ) {
+        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
+        RelDataType collectionType = Iterables.getOnlyElement(rowType.getFieldList()).getType();
+        RowHandler.RowFactory<Row> rowFactory = ctx.rowHandler().factory(typeFactory, rowType);
+
+        switch (collectionType.getSqlTypeName()) {
+            case ARRAY:
+                return new ArrayCollector<>(ctx.rowHandler(), rowFactory, IN_BUFFER_SIZE);
+            case MAP:
+                return new MapCollector<>(ctx.rowHandler(), rowFactory, IN_BUFFER_SIZE);
+            default:
+                throw new RuntimeException("Unsupported collectionType: " + collectionType.getSqlTypeName());
+        }
+    }
+
+    /** */
+    private abstract static class Collector<Row> implements Supplier<Row> {
+        /** */
+        protected final RowHandler<Row> rowHandler;
+
+        /** */
+        protected final RowHandler.RowFactory<Row> rowFactory;
+
+        /** */
+        protected final int cap;
+
+        /** */
+        Collector(
+            RowHandler<Row> handler,
+            RowHandler.RowFactory<Row> factory,
+            int cap
+        ) {
+            rowHandler = handler;
+            rowFactory = factory;
+            this.cap = cap;
+        }
+
+        /** */
+        public abstract void push(Row row);
+
+        /** */
+        public abstract void clear();
+
+        /** */
+        protected abstract Object outData();
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            Row out = rowFactory.create();
+
+            rowHandler.set(0, out, outData());
+            return out;
+        }
+    }
+
+    /** */
+    private static class MapCollector<Row> extends Collector<Row> {
+        /** */
+        private final Map<Object, Object> outBuf;
+
+        /** */
+        private MapCollector(
+            RowHandler<Row> handler,
+            RowHandler.RowFactory<Row> rowFactory,
+            int cap
+        ) {
+            super(handler, rowFactory, cap);
+            outBuf = new LinkedHashMap<>(cap);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Object outData() {
+            return Collections.unmodifiableMap(outBuf);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void push(Row row) {
+            outBuf.put(rowHandler.get(0, row), rowHandler.get(1, row));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void clear() {
+            outBuf.clear();
+        }
+    }
+
+    /** */
+    private static class ArrayCollector<Row> extends Collector<Row> {
+        /** */
+        private final List<Object> outBuf;
+
+        /** */
+        private ArrayCollector(
+            RowHandler<Row> handler,
+            RowHandler.RowFactory<Row> rowFactory,
+            int cap
+        ) {
+            super(handler, rowFactory, cap);
+            outBuf = new ArrayList<>(cap);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Object outData() {
+            return Collections.unmodifiableList(outBuf);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void push(Row row) {
+            outBuf.add(rowHandler.get(0, row));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void clear() {
+            outBuf.clear();

Review Comment:
   I think it's not correct to return unmodifiable collection over the same instance after clear. `outBuf` should be recreated on each `clear()` call.
   I've tried to write reproducer with CNLJ, but unfortunately found some other errors during planning. Something like this should reproduce the problem:
   ```
   query T
   SELECT ARRAY(SELECT b FROM t1 t1_1 WHERE t1_1.a = t1_2.a) FROM t1 t1_2 GROUP BY a ORDER BY a
   ----
   [10, 22]
   [21]
   ```
   This reproducer is also not working since passThroughTraits/deriveTraits is not implemented for IgniteCollect:
   ```
   query T
   SELECT ARRAY(SELECT b FROM t1 t1_1 WHERE t1_1.a = t1_2.a) FROM t1 t1_2 ORDER BY a
   ----
   [10, 22]
   [10, 22]
   [21]
   ```
   



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -1158,6 +1191,71 @@ private String extractSeparator(Row row) {
         }
     }
 
+    /** */
+    private static class ArrayAggregateAccumulator<Row> extends AggAccumulator<Row> {
+        /** */
+        public ArrayAggregateAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) {
+            super(aggCall, hnd);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            List<Object> result = new ArrayList<>(size());
+
+            for (Row row: this)
+                result.add(get(0, row));

Review Comment:
   Do we really need to copy array? Is it reused somehow after `end()`?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCollect.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Collect;
+import org.apache.calcite.rel.type.RelDataType;
+
+/** */
+public class IgniteCollect extends Collect implements IgniteRel {
+    /**
+     * Creates a <code>SingleRel</code>.
+     *
+     * @param cluster Cluster this relational expression belongs to
+     * @param traits
+     * @param input Input relational expression
+     */
+    public IgniteCollect(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode input,
+        RelDataType rowType
+    ) {
+        super(cluster, traits, input, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteCollect(cluster, getTraitSet(), sole(inputs), rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, RelNode input) {
+        return new IgniteCollect(getCluster(), traitSet, input, rowType());
+    }
+}

Review Comment:
   ```suggestion
   
       /** {@inheritDoc} */
       @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(RelTraitSet required) {
           if (required.getConvention() != IgniteConvention.INSTANCE)
               return null;
   
           if (TraitUtils.distribution(required) != IgniteDistributions.single())
               return null;
   
           return Pair.of(required, ImmutableList.of(required));
       }
   
       /** {@inheritDoc} */
       @Override public Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(RelTraitSet childTraits, int childId) {
           assert childId == 0;
   
           if (childTraits.getConvention() != IgniteConvention.INSTANCE)
               return null;
   
           if (TraitUtils.distribution(childTraits) != IgniteDistributions.single())
               return null;
   
           return Pair.of(childTraits, ImmutableList.of(childTraits));
       }
   }
   ```



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/StdSqlOperatorsTest.java:
##########
@@ -101,6 +104,21 @@ public void testAggregates() {
         assertExpression("LISTAGG(val, ',') WITHIN GROUP (ORDER BY val DESC)").returns("1").check();
         assertExpression("GROUP_CONCAT(val, ',' ORDER BY val DESC)").returns("1").check();
         assertExpression("STRING_AGG(val, ',' ORDER BY val DESC)").returns("1").check();
+        assertExpression("ARRAY_AGG(val ORDER BY val DESC)").returns(Collections.singletonList(1)).check();
+        assertQuery("SELECT ARRAY_CONCAT_AGG(a ORDER BY CARDINALITY(a)) FROM " +
+            "(SELECT 1 as id, ARRAY[3, 4, 5, 6] as a UNION SELECT 1 as id, ARRAY[1, 2] as a) GROUP BY id")
+            .returns((IntStream.range(1, 7).boxed().collect(Collectors.toList()))).check();
+    }
+
+    /** */
+    @Test
+    public void testCollect() {
+        assertExpression("ARRAY(SELECT * FROM (SELECT 1 UNION SELECT 2 UNION SELECT 3))")
+            .returns(IntStream.range(1, 4).boxed().collect(Collectors.toList())).check();
+
+        assertExpression("MAP(SELECT * FROM (SELECT 1, 'test1' UNION SELECT 2, 'test2' UNION SELECT 3, 'test3'))")
+            .returns(IntStream.range(1, 4).mapToObj(i -> new T2<>(i, "test" + i))
+                .collect(Collectors.toMap(T2::getKey, T2::getValue))).check();

Review Comment:
   There is already `testQueryAsCollections` in this class, lets unignore it instead.



##########
modules/calcite/src/test/sql/collect/test_collect.test:
##########
@@ -0,0 +1,50 @@
+# name: test/sql/collect/test_collect.test
+# description: Test to check ARRAY and MAP creation from query
+# group: [collect]
+
+statement ok
+CREATE TABLE T1 (a INTEGER, b INTEGER);
+
+statement ok
+INSERT INTO T1 VALUES (11, 22), (11, 10), (12, 21);
+
+query T
+SELECT ARRAY_AGG(b ORDER BY b) as c FROM T1 GROUP BY a ORDER BY a
+----
+[10, 22]
+[21]
+
+query T
+SELECT ARRAY(SELECT * FROM (SELECT 1 UNION SELECT 2 UNION SELECT 3))
+----
+[1, 2, 3]
+
+query T
+SELECT ARRAY[1, 2, 3]
+----
+[1, 2, 3]
+
+query T
+SELECT ARRAY_CONCAT_AGG(a) FROM (SELECT 1 as id, ARRAY[1, 2, 3] as a UNION SELECT 1 as id, ARRAY[4, 5, 6] as a) GROUP BY id
+----
+[1, 2, 3, 4, 5, 6]
+
+query T
+SELECT ARRAY_CONCAT_AGG(a ORDER BY rate DESC) FROM (SELECT 1 as id, ARRAY[1, 2, 3] as a, 2 as rate UNION SELECT 1 as id, ARRAY[4, 5, 6] as a, 1 as rate) GROUP BY id
+----
+[1, 2, 3, 4, 5, 6]
+
+query T
+SELECT ARRAY_CONCAT_AGG(a ORDER BY CARDINALITY(a)) FROM (SELECT 1 as id, ARRAY[3, 4, 5, 6] as a UNION SELECT 1 as id, ARRAY[1, 2] as a) GROUP BY id

Review Comment:
   Let's use here some other numbers to get not ordered result (to ensure ordering by rate and cardinality and not by array item value)



##########
modules/calcite/src/test/sql/collect/test_collect.test:
##########
@@ -0,0 +1,50 @@
+# name: test/sql/collect/test_collect.test
+# description: Test to check ARRAY and MAP creation from query
+# group: [collect]
+
+statement ok
+CREATE TABLE T1 (a INTEGER, b INTEGER);
+
+statement ok
+INSERT INTO T1 VALUES (11, 22), (11, 10), (12, 21);
+
+query T
+SELECT ARRAY_AGG(b ORDER BY b) as c FROM T1 GROUP BY a ORDER BY a

Review Comment:
   There are already exist some array_agg tests (types/list/array_agg.test_ignore). Let's unignore and check it.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -98,7 +101,14 @@ private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
     ) {
         RowHandler<Row> hnd = ctx.rowHandler();
 
-        Supplier<Accumulator<Row>> accSup = () -> new ListAggAccumulator<>(call, hnd);
+        Supplier<Accumulator<Row>> accSup;
+        String aggName = call.getAggregation().getName();
+        if ("LISTAGG".equals(aggName))
+            accSup = () -> new ListAggAccumulator<>(call, hnd);
+        else if ("ARRAY_CONCAT_AGG".equals(aggName))
+            accSup = () -> new ArrayConcatAggregateAccumulator<>(call, hnd);
+        else
+            accSup = () -> new ArrayAggregateAccumulator<>(call, hnd);

Review Comment:
   ```suggestion
           else if ("ARRAY_AGG".equals(aggName))
               accSup = () -> new ArrayAggregateAccumulator<>(call, hnd);
           else
               throw new AssertionError(call.getAggregation().getName());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org