You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/05/26 10:32:08 UTC

[GitHub] [ignite] alex-plekhanov commented on a diff in pull request #10023: IGNITE-14636 SQL Calcite: implement STRING_AGG, GROUP_CONCAT and LIST…

alex-plekhanov commented on code in PR #10023:
URL: https://github.com/apache/ignite/pull/10023#discussion_r881800535


##########
modules/calcite/src/main/codegen/config.fmpp:
##########
@@ -519,6 +519,8 @@ data: {
       "SYSTEM"
       "SYSTEM_TIME"
       "SYSTEM_USER"
+      "STRING_AGG"
+      "GROUP_CONCAT"

Review Comment:
   Can you please also remove `UUID` from `keywords` array? It's not related to this ticket, but it was added by mistake and there are to much changes in generated parser for this minor change.
   



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -46,177 +56,277 @@
  */
 public class Accumulators {
     /** */
-    public static Supplier<Accumulator> accumulatorFactory(AggregateCall call) {
-        if (!call.isDistinct())
-            return accumulatorFunctionFactory(call);
+    public static <Row> Supplier<Accumulator<Row>> accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
+        Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, ctx);
 
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
+        if (call.isDistinct())
+            return () -> new DistinctAccumulator<>(supplier, ctx.rowHandler());
 
-        return () -> new DistinctAccumulator(fac);
+        return supplier;
     }
 
     /** */
-    public static Supplier<Accumulator> accumulatorFunctionFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
         switch (call.getAggregation().getName()) {
             case "COUNT":
-                return LongCount.FACTORY;
+                return () -> new LongCount<>(hnd);
             case "AVG":
-                return avgFactory(call);
+                return avgFactory(call, hnd);
             case "SUM":
-                return sumFactory(call);
+                return sumFactory(call, hnd);
             case "$SUM0":
-                return sumEmptyIsZeroFactory(call);
+                return sumEmptyIsZeroFactory(call, hnd);
             case "MIN":
-                return minFactory(call);
+                return minFactory(call, hnd);
             case "MAX":
-                return maxFactory(call);
+                return maxFactory(call, hnd);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
+                return () -> new SingleVal<>(hnd);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return () -> new AnyVal<>(hnd);
+            case "LISTAGG":
+                return listAggregateSupplier(call, ctx);
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
     }
 
     /** */
-    private static Supplier<Accumulator> avgFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
+        Supplier<Accumulator<Row>> accSup = () -> new ListAggAccumulator<>(call, hnd);
+
+        RelCollation mapColl = getMappedCollation(call);
+
+        Comparator<Row> cmp;
+        if (mapColl != null) {
+            cmp = ctx.expressionFactory().comparator(getMappedCollation(call));
+
+            return () -> new SortingAccumulator<>(accSup, cmp);
+        }
+
+        return accSup;
+    }
+
+    /** */
+    private static RelCollation getMappedCollation(AggregateCall call) {
+        if (call.getCollation() == null || call.getCollation().getFieldCollations().isEmpty())
+            return null;
+
+        List<RelFieldCollation> collations = call.getCollation().getFieldCollations();
+        List<Integer> argList = call.getArgList();
+
+        // The target value will be accessed by field index in mapping array (targets[fieldIndex]),
+        // so srcCnt should be "max_field_index + 1" to prevent IndexOutOfBoundsException.
+        int srcCnt = Collections.max(collations, Comparator.comparingInt(RelFieldCollation::getFieldIndex))
+            .getFieldIndex() + 1;
+
+        Map<Integer, Integer> mapping = new HashMap<>();
+
+        int collOff = 0;
+        for (int i = 0; i < collations.size(); i++) {
+            int idx = collations.get(i).getFieldIndex();
+
+            int mapIdx = argList.indexOf(idx);
+            if (mapIdx == -1) { //collation not found in arglist
+                mapIdx = argList.size() + collOff;
+
+                collOff++;
+            }
+
+            mapping.put(idx, mapIdx);
+        }
+
+        return mapping.isEmpty() ? call.getCollation() : call.getCollation()
+            .apply(Mappings.target(mapping, srcCnt, srcCnt));
+    }
+
+    /** */
+    private static <Row> Supplier<Accumulator<Row>> avgFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("AVG() is not supported for type '" + call.type + "'.");
             case BIGINT:
             case DECIMAL:
-                return DecimalAvg.FACTORY;
+                return () -> new DecimalAvg<>(hnd);
             case DOUBLE:
             case REAL:
             case FLOAT:
             case INTEGER:
             default:
-                return DoubleAvg.FACTORY;
+                return () -> new DoubleAvg<>(hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> sumFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> sumFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("SUM() is not supported for type '" + call.type + "'.");
 
             case BIGINT:
             case DECIMAL:
-                return () -> new Sum(new DecimalSumEmptyIsZero());
+                return () -> new Sum<>(new DecimalSumEmptyIsZero<>(hnd), hnd);
 
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return () -> new Sum(new DoubleSumEmptyIsZero());
+                return () -> new Sum<>(new DoubleSumEmptyIsZero<>(hnd), hnd);
 
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             default:
-                return () -> new Sum(new LongSumEmptyIsZero());
+                return () -> new Sum<>(new LongSumEmptyIsZero<>(hnd), hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> sumEmptyIsZeroFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> sumEmptyIsZeroFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("SUM() is not supported for type '" + call.type + "'.");
 
             case BIGINT:
             case DECIMAL:
-                return DecimalSumEmptyIsZero.FACTORY;
+                return () -> new DecimalSumEmptyIsZero<>(hnd);
 
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleSumEmptyIsZero.FACTORY;
+                return () -> new DoubleSumEmptyIsZero<>(hnd);
 
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             default:
-                return LongSumEmptyIsZero.FACTORY;
+                return () -> new LongSumEmptyIsZero<>(hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> minFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> minFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleMinMax.MIN_FACTORY;
+                return () -> new DoubleMinMax<>(true, hnd);
             case DECIMAL:
-                return DecimalMinMax.MIN_FACTORY;
+                return () -> new DecimalMinMax<>(true, hnd);
             case INTEGER:
-                return IntMinMax.MIN_FACTORY;
+                return () -> new IntMinMax<>(true, hnd);
             case CHAR:
             case VARCHAR:
-                return VarCharMinMax.MIN_FACTORY;
+                return () -> new VarCharMinMax<>(true, hnd);
             case BINARY:
             case VARBINARY:
-                return ComparableMinMax.VARBINARY_MIN_FACTORY;
+                return () -> new ComparableMinMax<Row, ByteString>(true,
+                    tf -> tf.createTypeWithNullability(tf.createSqlType(VARBINARY), true), hnd);
             case ANY:
-                if (call.type instanceof UuidType)
-                    return ComparableMinMax.UUID_MIN_FACTORY;
+                if (call.type instanceof UuidType) {
+                    return () -> new ComparableMinMax<Row, UUID>(true,
+                        tf -> tf.createTypeWithNullability(tf.createCustomType(UUID.class), true), hnd);
+                }
                 throw new UnsupportedOperationException("MIN() is not supported for type '" + call.type + "'.");
             case BIGINT:
             default:
-                return LongMinMax.MIN_FACTORY;
+                return () -> new LongMinMax<>(true, hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> maxFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> maxFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleMinMax.MAX_FACTORY;
+                return () -> new DoubleMinMax<>(false, hnd);
             case DECIMAL:
-                return DecimalMinMax.MAX_FACTORY;
+                return () -> new DecimalMinMax<>(false, hnd);
             case INTEGER:
-                return IntMinMax.MAX_FACTORY;
+                return () -> new IntMinMax<>(false, hnd);
             case CHAR:
             case VARCHAR:
-                return VarCharMinMax.MAX_FACTORY;
+                return () -> new VarCharMinMax<>(false, hnd);
             case BINARY:
             case VARBINARY:
-                return ComparableMinMax.VARBINARY_MAX_FACTORY;
+                return () -> new ComparableMinMax<Row, ByteString>(false,
+                    tf -> tf.createTypeWithNullability(tf.createSqlType(VARBINARY), true), hnd);
             case ANY:
-                if (call.type instanceof UuidType)
-                    return ComparableMinMax.UUID_MAX_FACTORY;
+                if (call.type instanceof UuidType) {
+                    return () -> new ComparableMinMax<Row, UUID>(false,
+                        tf -> tf.createTypeWithNullability(tf.createCustomType(UUID.class), true), hnd);
+                }
                 throw new UnsupportedOperationException("MAX() is not supported for type '" + call.type + "'.");
             case BIGINT:
             default:
-                return LongMinMax.MAX_FACTORY;
+                return () -> new LongMinMax<>(false, hnd);
         }
     }
 
     /** */
-    private static class SingleVal extends AnyVal {
+    private abstract static class AbstractAccumulator<Row> implements Accumulator<Row> {
+        /** */
+        private final RowHandler<Row> hnd;
+
+        /** */
+        AbstractAccumulator(RowHandler<Row> hnd) {
+            this.hnd = hnd;
+        }
+
+        /** */
+        <T> T get(int idx, Row row) {
+            return (T)hnd.get(idx, row);
+        }
+
+        /** */
+        <T> void set(int idx, Row row, T val) {

Review Comment:
   Looks like method is not used



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -46,177 +56,277 @@
  */
 public class Accumulators {
     /** */
-    public static Supplier<Accumulator> accumulatorFactory(AggregateCall call) {
-        if (!call.isDistinct())
-            return accumulatorFunctionFactory(call);
+    public static <Row> Supplier<Accumulator<Row>> accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
+        Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, ctx);
 
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
+        if (call.isDistinct())
+            return () -> new DistinctAccumulator<>(supplier, ctx.rowHandler());
 
-        return () -> new DistinctAccumulator(fac);
+        return supplier;
     }
 
     /** */
-    public static Supplier<Accumulator> accumulatorFunctionFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
         switch (call.getAggregation().getName()) {
             case "COUNT":
-                return LongCount.FACTORY;
+                return () -> new LongCount<>(hnd);
             case "AVG":
-                return avgFactory(call);
+                return avgFactory(call, hnd);
             case "SUM":
-                return sumFactory(call);
+                return sumFactory(call, hnd);
             case "$SUM0":
-                return sumEmptyIsZeroFactory(call);
+                return sumEmptyIsZeroFactory(call, hnd);
             case "MIN":
-                return minFactory(call);
+                return minFactory(call, hnd);
             case "MAX":
-                return maxFactory(call);
+                return maxFactory(call, hnd);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
+                return () -> new SingleVal<>(hnd);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return () -> new AnyVal<>(hnd);
+            case "LISTAGG":
+                return listAggregateSupplier(call, ctx);
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
     }
 
     /** */
-    private static Supplier<Accumulator> avgFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
+        Supplier<Accumulator<Row>> accSup = () -> new ListAggAccumulator<>(call, hnd);
+
+        RelCollation mapColl = getMappedCollation(call);
+
+        Comparator<Row> cmp;
+        if (mapColl != null) {
+            cmp = ctx.expressionFactory().comparator(getMappedCollation(call));

Review Comment:
   `getMappedCollation(call)` -> `mapColl`



##########
modules/calcite/src/test/sql/aggregate/aggregates/test_string_agg.test:
##########
@@ -0,0 +1,153 @@
+# name: test/sql/aggregate/aggregates/test_string_agg.test
+# description: Test STRING_AGG operator
+# group: [aggregates]
+
+# test incorrect usage of STRING_AGG function
+
+statement error
+SELECT STRING_AGG()
+
+statement error
+SELECT STRING_AGG(1, 2, 3)
+
+statement error
+SELECT STRING_AGG(STRING_AGG('a', ','))
+
+# test string aggregation on scalar values
+query T
+SELECT STRING_AGG('a', ',')
+----
+a
+
+# test string aggregation on scalar values
+query TTTT
+SELECT STRING_AGG('a', ','), STRING_AGG(NULL, ','), STRING_AGG('a', NULL), STRING_AGG(NULL, NULL)
+----
+a
+NULL
+a

Review Comment:
   Why do we have not NULL value here? Do other databases return the same result with NULL separated?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -46,177 +56,277 @@
  */
 public class Accumulators {
     /** */
-    public static Supplier<Accumulator> accumulatorFactory(AggregateCall call) {
-        if (!call.isDistinct())
-            return accumulatorFunctionFactory(call);
+    public static <Row> Supplier<Accumulator<Row>> accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
+        Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, ctx);
 
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
+        if (call.isDistinct())
+            return () -> new DistinctAccumulator<>(supplier, ctx.rowHandler());
 
-        return () -> new DistinctAccumulator(fac);
+        return supplier;
     }
 
     /** */
-    public static Supplier<Accumulator> accumulatorFunctionFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
         switch (call.getAggregation().getName()) {
             case "COUNT":
-                return LongCount.FACTORY;
+                return () -> new LongCount<>(hnd);
             case "AVG":
-                return avgFactory(call);
+                return avgFactory(call, hnd);
             case "SUM":
-                return sumFactory(call);
+                return sumFactory(call, hnd);
             case "$SUM0":
-                return sumEmptyIsZeroFactory(call);
+                return sumEmptyIsZeroFactory(call, hnd);
             case "MIN":
-                return minFactory(call);
+                return minFactory(call, hnd);
             case "MAX":
-                return maxFactory(call);
+                return maxFactory(call, hnd);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
+                return () -> new SingleVal<>(hnd);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return () -> new AnyVal<>(hnd);
+            case "LISTAGG":
+                return listAggregateSupplier(call, ctx);
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
     }
 
     /** */
-    private static Supplier<Accumulator> avgFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
+        Supplier<Accumulator<Row>> accSup = () -> new ListAggAccumulator<>(call, hnd);
+
+        RelCollation mapColl = getMappedCollation(call);
+
+        Comparator<Row> cmp;
+        if (mapColl != null) {
+            cmp = ctx.expressionFactory().comparator(getMappedCollation(call));
+
+            return () -> new SortingAccumulator<>(accSup, cmp);
+        }
+
+        return accSup;
+    }
+
+    /** */
+    private static RelCollation getMappedCollation(AggregateCall call) {
+        if (call.getCollation() == null || call.getCollation().getFieldCollations().isEmpty())
+            return null;
+
+        List<RelFieldCollation> collations = call.getCollation().getFieldCollations();
+        List<Integer> argList = call.getArgList();
+
+        // The target value will be accessed by field index in mapping array (targets[fieldIndex]),
+        // so srcCnt should be "max_field_index + 1" to prevent IndexOutOfBoundsException.
+        int srcCnt = Collections.max(collations, Comparator.comparingInt(RelFieldCollation::getFieldIndex))
+            .getFieldIndex() + 1;
+
+        Map<Integer, Integer> mapping = new HashMap<>();
+
+        int collOff = 0;
+        for (int i = 0; i < collations.size(); i++) {
+            int idx = collations.get(i).getFieldIndex();
+
+            int mapIdx = argList.indexOf(idx);
+            if (mapIdx == -1) { //collation not found in arglist
+                mapIdx = argList.size() + collOff;
+
+                collOff++;
+            }
+
+            mapping.put(idx, mapIdx);
+        }
+
+        return mapping.isEmpty() ? call.getCollation() : call.getCollation()
+            .apply(Mappings.target(mapping, srcCnt, srcCnt));
+    }
+
+    /** */
+    private static <Row> Supplier<Accumulator<Row>> avgFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("AVG() is not supported for type '" + call.type + "'.");
             case BIGINT:
             case DECIMAL:
-                return DecimalAvg.FACTORY;
+                return () -> new DecimalAvg<>(hnd);
             case DOUBLE:
             case REAL:
             case FLOAT:
             case INTEGER:
             default:
-                return DoubleAvg.FACTORY;
+                return () -> new DoubleAvg<>(hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> sumFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> sumFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("SUM() is not supported for type '" + call.type + "'.");
 
             case BIGINT:
             case DECIMAL:
-                return () -> new Sum(new DecimalSumEmptyIsZero());
+                return () -> new Sum<>(new DecimalSumEmptyIsZero<>(hnd), hnd);
 
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return () -> new Sum(new DoubleSumEmptyIsZero());
+                return () -> new Sum<>(new DoubleSumEmptyIsZero<>(hnd), hnd);
 
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             default:
-                return () -> new Sum(new LongSumEmptyIsZero());
+                return () -> new Sum<>(new LongSumEmptyIsZero<>(hnd), hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> sumEmptyIsZeroFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> sumEmptyIsZeroFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("SUM() is not supported for type '" + call.type + "'.");
 
             case BIGINT:
             case DECIMAL:
-                return DecimalSumEmptyIsZero.FACTORY;
+                return () -> new DecimalSumEmptyIsZero<>(hnd);
 
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleSumEmptyIsZero.FACTORY;
+                return () -> new DoubleSumEmptyIsZero<>(hnd);
 
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             default:
-                return LongSumEmptyIsZero.FACTORY;
+                return () -> new LongSumEmptyIsZero<>(hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> minFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> minFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleMinMax.MIN_FACTORY;
+                return () -> new DoubleMinMax<>(true, hnd);
             case DECIMAL:
-                return DecimalMinMax.MIN_FACTORY;
+                return () -> new DecimalMinMax<>(true, hnd);
             case INTEGER:
-                return IntMinMax.MIN_FACTORY;
+                return () -> new IntMinMax<>(true, hnd);
             case CHAR:
             case VARCHAR:
-                return VarCharMinMax.MIN_FACTORY;
+                return () -> new VarCharMinMax<>(true, hnd);
             case BINARY:
             case VARBINARY:
-                return ComparableMinMax.VARBINARY_MIN_FACTORY;
+                return () -> new ComparableMinMax<Row, ByteString>(true,
+                    tf -> tf.createTypeWithNullability(tf.createSqlType(VARBINARY), true), hnd);
             case ANY:
-                if (call.type instanceof UuidType)
-                    return ComparableMinMax.UUID_MIN_FACTORY;
+                if (call.type instanceof UuidType) {
+                    return () -> new ComparableMinMax<Row, UUID>(true,
+                        tf -> tf.createTypeWithNullability(tf.createCustomType(UUID.class), true), hnd);
+                }
                 throw new UnsupportedOperationException("MIN() is not supported for type '" + call.type + "'.");
             case BIGINT:
             default:
-                return LongMinMax.MIN_FACTORY;
+                return () -> new LongMinMax<>(true, hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> maxFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> maxFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleMinMax.MAX_FACTORY;
+                return () -> new DoubleMinMax<>(false, hnd);
             case DECIMAL:
-                return DecimalMinMax.MAX_FACTORY;
+                return () -> new DecimalMinMax<>(false, hnd);
             case INTEGER:
-                return IntMinMax.MAX_FACTORY;
+                return () -> new IntMinMax<>(false, hnd);
             case CHAR:
             case VARCHAR:
-                return VarCharMinMax.MAX_FACTORY;
+                return () -> new VarCharMinMax<>(false, hnd);
             case BINARY:
             case VARBINARY:
-                return ComparableMinMax.VARBINARY_MAX_FACTORY;
+                return () -> new ComparableMinMax<Row, ByteString>(false,
+                    tf -> tf.createTypeWithNullability(tf.createSqlType(VARBINARY), true), hnd);
             case ANY:
-                if (call.type instanceof UuidType)
-                    return ComparableMinMax.UUID_MAX_FACTORY;
+                if (call.type instanceof UuidType) {
+                    return () -> new ComparableMinMax<Row, UUID>(false,
+                        tf -> tf.createTypeWithNullability(tf.createCustomType(UUID.class), true), hnd);
+                }
                 throw new UnsupportedOperationException("MAX() is not supported for type '" + call.type + "'.");
             case BIGINT:
             default:
-                return LongMinMax.MAX_FACTORY;
+                return () -> new LongMinMax<>(false, hnd);
         }
     }
 
     /** */
-    private static class SingleVal extends AnyVal {
+    private abstract static class AbstractAccumulator<Row> implements Accumulator<Row> {
+        /** */
+        private final RowHandler<Row> hnd;
+
+        /** */
+        AbstractAccumulator(RowHandler<Row> hnd) {
+            this.hnd = hnd;
+        }
+
+        /** */
+        <T> T get(int idx, Row row) {
+            return (T)hnd.get(idx, row);
+        }
+
+        /** */
+        <T> void set(int idx, Row row, T val) {
+            hnd.set(idx, row, val);
+        }
+
+        /** */
+        int columnCount(Row row) {
+            return hnd.columnCount(row);
+        }
+
+        /** */
+        Row createRow(IgniteTypeFactory typeFactory, List<RelDataType> fieldTypes) {

Review Comment:
   Looks like method is not used



##########
modules/calcite/src/test/sql/aggregate/aggregates/test_perfect_ht.test_ignore:
##########
@@ -25,15 +24,6 @@ NULL	1	1	1
 1997	12	1	1
 2001	30	1	1
 
-# use aggregates with destructors

Review Comment:
   Test cases should not be removed from test_ignore file until all test cases are green and we can remove test_ignore file entirely.
   
   In this test, looks like we can also implement ARRAY_AGG (by another ticket) as replacement for LIST aggregate function.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/fun/IgniteStdSqlOperatorTable.java:
##########
@@ -83,6 +83,9 @@ public IgniteStdSqlOperatorTable() {
         register(SqlStdOperatorTable.ANY_VALUE);
         register(SqlStdOperatorTable.SINGLE_VALUE);
         register(SqlStdOperatorTable.FILTER);
+        register(SqlLibraryOperators.GROUP_CONCAT);

Review Comment:
   Let's also add primitive tests for these functions to `StdSqlOperatorsTest` (one test per function). And add new supported functions to `sql-calcite.adoc` file.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -972,39 +1062,170 @@ private ComparableMinMax(boolean min, Function<IgniteTypeFactory, RelDataType> t
     }
 
     /** */
-    private static class DistinctAccumulator implements Accumulator {
+    private static class SortingAccumulator<Row> implements Accumulator<Row> {
         /** */
-        private final Accumulator acc;
+        private final transient Comparator<Row> cmp;
 
         /** */
-        private final Set<Object> set = new HashSet<>();
+        private final List<Row> list;
 
         /** */
-        private DistinctAccumulator(Supplier<Accumulator> accSup) {
-            this.acc = accSup.get();
+        private final Accumulator<Row> acc;
+
+        /**
+         * @param accSup Acc support.
+         * @param cmp Comparator.
+         */
+        private SortingAccumulator(Supplier<Accumulator<Row>> accSup, Comparator<Row> cmp) {
+            this.cmp = cmp;
+
+            list = new ArrayList<>();
+            acc = accSup.get();
         }
 
         /** {@inheritDoc} */
-        @Override public void add(Object... args) {
-            Object in = args[0];
+        @Override public void add(Row row) {
+            list.add(row);
+        }
 
-            if (in == null)
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator<Row> other) {
+            SortingAccumulator<Row> other1 = (SortingAccumulator<Row>)other;
+
+            list.addAll(other1.list);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            list.sort(cmp);
+
+            for (Row row : list)
+                acc.add(row);
+
+            return acc.end();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
+            return acc.argumentTypes(typeFactory);
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) {
+            return acc.returnType(typeFactory);
+        }
+    }
+
+    /** */
+    private static class ListAggAccumulator<Row> extends AbstractAccumulator<Row> {
+        /** Default separator. */
+        private static final String DEFAULT_SEPARATOR = ",";
+
+        /** */
+        private final List<Row> list;
+
+        /** */
+        private final int sepIdx;
+
+        /** */
+        public ListAggAccumulator(AggregateCall call, RowHandler<Row> hnd) {
+            super(hnd);
+
+            sepIdx = call.getArgList().size() > 1 ? 1 : 0;
+
+            list = new ArrayList<>();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void add(Row row) {
+            if (row == null || get(0, row) == null)
+                return;
+
+            list.add(row);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator<Row> other) {
+            ListAggAccumulator<Row> other0 = (ListAggAccumulator<Row>)other;
+
+            list.addAll(other0.list);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            if (list.isEmpty())
+                return null;
+
+            StringBuilder builder = new StringBuilder();
+
+            for (Row row: list) {
+                if (builder.length() != 0)
+                    builder.append(extractSeparator(row));
+
+                builder.append(Objects.toString(get(0, row)));
+            }
+
+            return builder.toString();
+        }
+
+        /** */
+        private String extractSeparator(Row row) {
+            if (sepIdx < 1 || columnCount(row) <= sepIdx)
+                return DEFAULT_SEPARATOR;
+
+            Object rawSep = get(sepIdx, row);
+
+            if (rawSep == null)
+                return DEFAULT_SEPARATOR;
+
+            return rawSep.toString();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
+            return F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARCHAR), true),
+                typeFactory.createTypeWithNullability(typeFactory.createSqlType(CHAR), true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) {
+            return typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARCHAR), true);
+        }
+    }
+
+    /** */
+    private static class DistinctAccumulator<Row> extends AbstractAccumulator<Row> {
+        /** */
+        private final Accumulator<Row> acc;
+
+        /** */
+        private final Map<Object, Row> rows = new LinkedHashMap<>();
+
+        /** */
+        private DistinctAccumulator(Supplier<Accumulator<Row>> accSup, RowHandler<Row> hnd) {
+            super(hnd);
+            acc = accSup.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void add(Row row) {
+            if (row == null || columnCount(row) == 0)

Review Comment:
   Shouldn't we check first value for `null` ( `|| get(0, row) == null` )?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -46,177 +56,277 @@
  */
 public class Accumulators {
     /** */
-    public static Supplier<Accumulator> accumulatorFactory(AggregateCall call) {
-        if (!call.isDistinct())
-            return accumulatorFunctionFactory(call);
+    public static <Row> Supplier<Accumulator<Row>> accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
+        Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, ctx);
 
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
+        if (call.isDistinct())
+            return () -> new DistinctAccumulator<>(supplier, ctx.rowHandler());
 
-        return () -> new DistinctAccumulator(fac);
+        return supplier;
     }
 
     /** */
-    public static Supplier<Accumulator> accumulatorFunctionFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
         switch (call.getAggregation().getName()) {
             case "COUNT":
-                return LongCount.FACTORY;
+                return () -> new LongCount<>(hnd);
             case "AVG":
-                return avgFactory(call);
+                return avgFactory(call, hnd);
             case "SUM":
-                return sumFactory(call);
+                return sumFactory(call, hnd);
             case "$SUM0":
-                return sumEmptyIsZeroFactory(call);
+                return sumEmptyIsZeroFactory(call, hnd);
             case "MIN":
-                return minFactory(call);
+                return minFactory(call, hnd);
             case "MAX":
-                return maxFactory(call);
+                return maxFactory(call, hnd);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
+                return () -> new SingleVal<>(hnd);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return () -> new AnyVal<>(hnd);
+            case "LISTAGG":
+                return listAggregateSupplier(call, ctx);
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
     }
 
     /** */
-    private static Supplier<Accumulator> avgFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
+        Supplier<Accumulator<Row>> accSup = () -> new ListAggAccumulator<>(call, hnd);
+
+        RelCollation mapColl = getMappedCollation(call);
+
+        Comparator<Row> cmp;

Review Comment:
   Move `cmp` declaration inside `if` block



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -972,39 +1062,170 @@ private ComparableMinMax(boolean min, Function<IgniteTypeFactory, RelDataType> t
     }
 
     /** */
-    private static class DistinctAccumulator implements Accumulator {
+    private static class SortingAccumulator<Row> implements Accumulator<Row> {
         /** */
-        private final Accumulator acc;
+        private final transient Comparator<Row> cmp;
 
         /** */
-        private final Set<Object> set = new HashSet<>();
+        private final List<Row> list;
 
         /** */
-        private DistinctAccumulator(Supplier<Accumulator> accSup) {
-            this.acc = accSup.get();
+        private final Accumulator<Row> acc;
+
+        /**
+         * @param accSup Acc support.

Review Comment:
   `Acc support` -> `Accumulator supplier`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -972,39 +1062,170 @@ private ComparableMinMax(boolean min, Function<IgniteTypeFactory, RelDataType> t
     }
 
     /** */
-    private static class DistinctAccumulator implements Accumulator {
+    private static class SortingAccumulator<Row> implements Accumulator<Row> {
         /** */
-        private final Accumulator acc;
+        private final transient Comparator<Row> cmp;
 
         /** */
-        private final Set<Object> set = new HashSet<>();
+        private final List<Row> list;
 
         /** */
-        private DistinctAccumulator(Supplier<Accumulator> accSup) {
-            this.acc = accSup.get();
+        private final Accumulator<Row> acc;
+
+        /**
+         * @param accSup Acc support.
+         * @param cmp Comparator.
+         */
+        private SortingAccumulator(Supplier<Accumulator<Row>> accSup, Comparator<Row> cmp) {
+            this.cmp = cmp;
+
+            list = new ArrayList<>();
+            acc = accSup.get();
         }
 
         /** {@inheritDoc} */
-        @Override public void add(Object... args) {
-            Object in = args[0];
+        @Override public void add(Row row) {
+            list.add(row);
+        }
 
-            if (in == null)
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator<Row> other) {
+            SortingAccumulator<Row> other1 = (SortingAccumulator<Row>)other;
+
+            list.addAll(other1.list);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            list.sort(cmp);
+
+            for (Row row : list)
+                acc.add(row);
+
+            return acc.end();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
+            return acc.argumentTypes(typeFactory);
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) {
+            return acc.returnType(typeFactory);
+        }
+    }
+
+    /** */
+    private static class ListAggAccumulator<Row> extends AbstractAccumulator<Row> {
+        /** Default separator. */
+        private static final String DEFAULT_SEPARATOR = ",";
+
+        /** */
+        private final List<Row> list;
+
+        /** */
+        private final int sepIdx;
+
+        /** */
+        public ListAggAccumulator(AggregateCall call, RowHandler<Row> hnd) {
+            super(hnd);
+
+            sepIdx = call.getArgList().size() > 1 ? 1 : 0;

Review Comment:
   Looks like there should be some boolean flag, since currently we encode some condition to integer and later decode this to determine if default separator should be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org