You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by ho...@apache.org on 2019/03/07 03:26:30 UTC

[calcite] branch master updated: [CALCITE-2785] In EnumerableAggregate, wrong result produced if there are sorted aggregates and non-sorted aggregates at the same time

This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new f992e6c  [CALCITE-2785] In EnumerableAggregate, wrong result produced if there are sorted aggregates and non-sorted aggregates at the same time
f992e6c is described below

commit f992e6c8fbc3712c6bd70096fb6b05da448b2cab
Author: hongzezhang <ho...@tencent.com>
AuthorDate: Thu Feb 28 11:38:42 2019 +0800

    [CALCITE-2785] In EnumerableAggregate, wrong result produced if there are sorted aggregates and non-sorted aggregates at the same time
    
    Rename SequencedAdderAggregateLambdaFactory to BasicAggregateLambdaFactory;
    Rename OrderedAggregateLambdaFactory to LazyAggregateLambdaFactory;
    Add BasicLazyAccumulator.
---
 ...ctory.java => BasicAggregateLambdaFactory.java} |  6 +--
 .../adapter/enumerable/BasicLazyAccumulator.java   | 45 ++++++++++++++++++++++
 .../adapter/enumerable/EnumerableAggregate.java    | 18 ++++++---
 ...actory.java => LazyAggregateLambdaFactory.java} | 40 +++++++++++--------
 .../calcite/adapter/enumerable/SourceSorter.java   | 17 +++++---
 .../org/apache/calcite/util/BuiltInMethod.java     | 10 +++--
 core/src/test/resources/sql/agg.iq                 | 21 ++++++++++
 7 files changed, 124 insertions(+), 33 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicAggregateLambdaFactory.java
similarity index 94%
rename from core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java
rename to core/src/main/java/org/apache/calcite/adapter/enumerable/BasicAggregateLambdaFactory.java
index 4b4577c..ee67523 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicAggregateLambdaFactory.java
@@ -31,13 +31,13 @@ import java.util.List;
  * @param <TResult> Type of the enumerable output result
  * @param <TKey> Type of the group-by key
  */
-public class SequencedAdderAggregateLambdaFactory<TSource, TAccumulate, TResult, TKey>
+public class BasicAggregateLambdaFactory<TSource, TAccumulate, TResult, TKey>
     implements AggregateLambdaFactory<TSource, TAccumulate, TAccumulate, TResult, TKey> {
 
   private final Function0<TAccumulate> accumulatorInitializer;
   private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdderDecorator;
 
-  public SequencedAdderAggregateLambdaFactory(
+  public BasicAggregateLambdaFactory(
       Function0<TAccumulate> accumulatorInitializer,
       List<Function2<TAccumulate, TSource, TAccumulate>> accumulatorAdders) {
     this.accumulatorInitializer = accumulatorInitializer;
@@ -85,4 +85,4 @@ public class SequencedAdderAggregateLambdaFactory<TSource, TAccumulate, TResult,
   }
 }
 
-// End SequencedAdderAggregateLambdaFactory.java
+// End BasicAggregateLambdaFactory.java
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicLazyAccumulator.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicLazyAccumulator.java
new file mode 100644
index 0000000..4374cca
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicLazyAccumulator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.linq4j.function.Function2;
+
+/**
+ * Performs accumulation against a pre-collected list of input sources,
+ * used with {@link LazyAggregateLambdaFactory}.
+ *
+ * @param <TAccumulate> Type of the accumulator
+ * @param <TSource>     Type of the enumerable input source
+ */
+public class BasicLazyAccumulator<TAccumulate, TSource>
+    implements LazyAggregateLambdaFactory.LazyAccumulator<TAccumulate, TSource> {
+
+  private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+
+  public BasicLazyAccumulator(Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder) {
+    this.accumulatorAdder = accumulatorAdder;
+  }
+
+  @Override public void accumulate(Iterable<TSource> sourceIterable, TAccumulate accumulator) {
+    TAccumulate accumulator1 = accumulator;
+    for (TSource tSource : sourceIterable) {
+      accumulator1 = accumulatorAdder.apply(accumulator1, tSource);
+    }
+  }
+}
+
+// End BasicLazyAccumulator.java
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
index 3130bd0..aa33685 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
@@ -485,11 +485,11 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
    * <ul>
    *
    * <li>{@code hasOrderedCall == true} means there is at least one aggregate
-   * call including sort spec. We use {@link OrderedAggregateLambdaFactory}
+   * call including sort spec. We use {@link LazyAggregateLambdaFactory}
    * implementation to implement sorted aggregates for that.
    *
    * <li>{@code hasOrderedCall == false} indicates to use
-   * {@link SequencedAdderAggregateLambdaFactory} to implement a non-sort
+   * {@link BasicAggregateLambdaFactory} to implement a non-sort
    * aggregate.
    *
    * </ul>
@@ -502,12 +502,20 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
       ParameterExpression lambdaFactory) {
     if (hasOrderedCall) {
       ParameterExpression pe = Expressions.parameter(List.class,
-          builder.newName("sourceSorters"));
+          builder.newName("lazyAccumulators"));
       builder.add(
           Expressions.declare(0, pe, Expressions.new_(LinkedList.class)));
 
       for (AggImpState agg : aggs) {
         if (agg.call.collation.equals(RelCollations.EMPTY)) {
+          // if the call does not require ordering, fallback to
+          // use a non-sorted lazy accumulator.
+          builder.add(
+              Expressions.statement(
+                  Expressions.call(pe,
+                      BuiltInMethod.COLLECTION_ADD.method,
+                      Expressions.new_(BuiltInMethod.BASIC_LAZY_ACCUMULATOR.constructor,
+                          agg.accumulatorAdder))));
           continue;
         }
         final Pair<Expression, Expression> pair =
@@ -523,7 +531,7 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
       builder.add(
           Expressions.declare(0, lambdaFactory,
               Expressions.new_(
-                  BuiltInMethod.ORDERED_AGGREGATE_LAMBDA_FACTORY.constructor,
+                  BuiltInMethod.LAZY_AGGREGATE_LAMBDA_FACTORY.constructor,
                   accumulatorInitializer, pe)));
     } else {
       // when hasOrderedCall == false
@@ -541,7 +549,7 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
       builder.add(
           Expressions.declare(0, lambdaFactory,
               Expressions.new_(
-                  BuiltInMethod.SEQUENCED_ADDER_AGGREGATE_LAMBDA_FACTORY.constructor,
+                  BuiltInMethod.BASIC_AGGREGATE_LAMBDA_FACTORY.constructor,
                   accumulatorInitializer, pe)));
     }
   }
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/LazyAggregateLambdaFactory.java
similarity index 69%
rename from core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java
rename to core/src/main/java/org/apache/calcite/adapter/enumerable/LazyAggregateLambdaFactory.java
index 3c7c72a..aa11ca1 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/LazyAggregateLambdaFactory.java
@@ -25,28 +25,27 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * Generate aggregate lambdas that sorts the input source before calling each
- * aggregate adder.
+ * Generate aggregate lambdas that preserve the input source before calling each
+ * aggregate adder, this implementation is generally used when we need to sort the input
+ * before performing aggregation.
  *
  * @param <TSource> Type of the enumerable input source
  * @param <TKey> Type of the group-by key
- * @param <TSortKey> Type of the sort key
  * @param <TOrigAccumulate> Type of the original accumulator
  * @param <TResult> Type of the enumerable output result
  */
-public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
-    TOrigAccumulate, TResult>
+public class LazyAggregateLambdaFactory<TSource, TKey, TOrigAccumulate, TResult>
     implements AggregateLambdaFactory<TSource, TOrigAccumulate,
-    OrderedAggregateLambdaFactory.LazySource<TSource>, TResult, TKey> {
+    LazyAggregateLambdaFactory.LazySource<TSource>, TResult, TKey> {
 
   private final Function0<TOrigAccumulate> accumulatorInitializer;
-  private final List<SourceSorter<TOrigAccumulate, TSource, TSortKey>> sourceSorters;
+  private final List<LazyAccumulator<TOrigAccumulate, TSource>> accumulators;
 
-  public OrderedAggregateLambdaFactory(
+  public LazyAggregateLambdaFactory(
       Function0<TOrigAccumulate> accumulatorInitializer,
-      List<SourceSorter<TOrigAccumulate, TSource, TSortKey>> sourceSorters) {
+      List<LazyAccumulator<TOrigAccumulate, TSource>> accumulators) {
     this.accumulatorInitializer = accumulatorInitializer;
-    this.sourceSorters = sourceSorters;
+    this.accumulators = accumulators;
   }
 
   public Function0<LazySource<TSource>> accumulatorInitializer() {
@@ -65,8 +64,8 @@ public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
       Function1<TOrigAccumulate, TResult> resultSelector) {
     return lazySource -> {
       final TOrigAccumulate accumulator = accumulatorInitializer.apply();
-      for (SourceSorter<TOrigAccumulate, TSource, TSortKey> acc : sourceSorters) {
-        acc.sortAndAccumulate(lazySource, accumulator);
+      for (LazyAccumulator<TOrigAccumulate, TSource> acc : accumulators) {
+        acc.accumulate(lazySource, accumulator);
       }
       return resultSelector.apply(accumulator);
     };
@@ -76,15 +75,15 @@ public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
       Function2<TKey, TOrigAccumulate, TResult> resultSelector) {
     return (groupByKey, lazySource) -> {
       final TOrigAccumulate accumulator = accumulatorInitializer.apply();
-      for (SourceSorter<TOrigAccumulate, TSource, TSortKey> acc : sourceSorters) {
-        acc.sortAndAccumulate(lazySource, accumulator);
+      for (LazyAccumulator<TOrigAccumulate, TSource> acc : accumulators) {
+        acc.accumulate(lazySource, accumulator);
       }
       return resultSelector.apply(groupByKey, accumulator);
     };
   }
 
   /**
-   * Cache the input sources. (Will be sorted, aggregated in result selector.)
+   * Cache the input sources. (Will be aggregated in result selector.)
    *
    * @param <TSource> Type of the enumerable input source.
    */
@@ -100,6 +99,15 @@ public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
     }
   }
 
+  /**
+   * Accumulate on the cached input sources.
+   *
+   * @param <TOrigAccumulate> Type of the original accumulator
+   * @param <TSource> Type of the enumerable input source.
+   */
+  public interface LazyAccumulator<TOrigAccumulate, TSource> {
+    void accumulate(Iterable<TSource> sourceIterable, TOrigAccumulate accumulator);
+  }
 }
 
-// End OrderedAggregateLambdaFactory.java
+// End LazyAggregateLambdaFactory.java
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
index 9345da4..c91bb1a 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
@@ -25,13 +25,15 @@ import java.util.List;
 
 /**
  * Helper that combines the sorting process and accumulating process against the
- * aggregate execution, used with {@link OrderedAggregateLambdaFactory}.
+ * aggregate execution, used with {@link LazyAggregateLambdaFactory}.
  *
  * @param <TAccumulate> Type of the accumulator
- * @param <TSource> Type of the enumerable input source
- * @param <TSortKey> Type of the sort key
+ * @param <TSource>     Type of the enumerable input source
+ * @param <TSortKey>    Type of the sort key
  */
-public class SourceSorter<TAccumulate, TSource, TSortKey> {
+public class SourceSorter<TAccumulate, TSource, TSortKey>
+    implements LazyAggregateLambdaFactory.LazyAccumulator<TAccumulate, TSource> {
+
   private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
   private final Function1<TSource, TSortKey> keySelector;
   private final Comparator<TSortKey> comparator;
@@ -45,7 +47,12 @@ public class SourceSorter<TAccumulate, TSource, TSortKey> {
     this.comparator = comparator;
   }
 
-  public void sortAndAccumulate(Iterable<TSource> sourceIterable,
+  @Override public void accumulate(Iterable<TSource> sourceIterable,
+      TAccumulate accumulator) {
+    sortAndAccumulate(sourceIterable, accumulator);
+  }
+
+  private void sortAndAccumulate(Iterable<TSource> sourceIterable,
       TAccumulate accumulator) {
     List<TSource> sorted = Linq4j.asEnumerable(sourceIterable)
         .orderBy(keySelector, comparator)
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index fc7cb60..3169b55 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -18,8 +18,9 @@ package org.apache.calcite.util;
 
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.enumerable.AggregateLambdaFactory;
-import org.apache.calcite.adapter.enumerable.OrderedAggregateLambdaFactory;
-import org.apache.calcite.adapter.enumerable.SequencedAdderAggregateLambdaFactory;
+import org.apache.calcite.adapter.enumerable.BasicAggregateLambdaFactory;
+import org.apache.calcite.adapter.enumerable.BasicLazyAccumulator;
+import org.apache.calcite.adapter.enumerable.LazyAggregateLambdaFactory;
 import org.apache.calcite.adapter.enumerable.SourceSorter;
 import org.apache.calcite.adapter.java.ReflectiveSchema;
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
@@ -485,9 +486,10 @@ public enum BuiltInMethod {
       String.class),
   SOURCE_SORTER(SourceSorter.class, Function2.class, Function1.class,
       Comparator.class),
-  ORDERED_AGGREGATE_LAMBDA_FACTORY(OrderedAggregateLambdaFactory.class,
+  BASIC_LAZY_ACCUMULATOR(BasicLazyAccumulator.class, Function2.class),
+  LAZY_AGGREGATE_LAMBDA_FACTORY(LazyAggregateLambdaFactory.class,
       Function0.class, List.class),
-  SEQUENCED_ADDER_AGGREGATE_LAMBDA_FACTORY(SequencedAdderAggregateLambdaFactory.class,
+  BASIC_AGGREGATE_LAMBDA_FACTORY(BasicAggregateLambdaFactory.class,
       Function0.class, List.class),
   AGG_LAMBDA_FACTORY_ACC_INITIALIZER(AggregateLambdaFactory.class,
       "accumulatorInitializer"),
diff --git a/core/src/test/resources/sql/agg.iq b/core/src/test/resources/sql/agg.iq
index 83dcbca..41c1e95 100644
--- a/core/src/test/resources/sql/agg.iq
+++ b/core/src/test/resources/sql/agg.iq
@@ -2418,6 +2418,27 @@ EnumerableAggregate(group=[{7}], EMPNOS=[COLLECT($0) WITHIN GROUP ([0 DESC])])
   EnumerableTableScan(table=[[scott, EMP]])
 !plan
 
+select
+deptno,
+collect(empno) as empnos_1,
+collect(empno) within group (order by empno desc) as empnos_2
+from "scott".emp
+group by deptno;
+
++--------+--------------------------------------+--------------------------------------+
+| DEPTNO | EMPNOS_1                             | EMPNOS_2                             |
++--------+--------------------------------------+--------------------------------------+
+|     10 | [7782, 7839, 7934]                   | [7934, 7839, 7782]                   |
+|     20 | [7369, 7566, 7788, 7876, 7902]       | [7902, 7876, 7788, 7566, 7369]       |
+|     30 | [7499, 7521, 7654, 7698, 7844, 7900] | [7900, 7844, 7698, 7654, 7521, 7499] |
++--------+--------------------------------------+--------------------------------------+
+(3 rows)
+
+!ok
+EnumerableAggregate(group=[{7}], EMPNOS_1=[COLLECT($0)], EMPNOS_2=[COLLECT($0) WITHIN GROUP ([0 DESC])])
+  EnumerableTableScan(table=[[scott, EMP]])
+!plan
+
 select deptno, collect(empno) within group (order by empno desc)
 filter (where empno > 7500) as empnos
 from "scott".emp