You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by ho...@apache.org on 2019/03/07 03:26:30 UTC
[calcite] branch master updated: [CALCITE-2785] In
EnumerableAggregate,
wrong result produced if there are sorted aggregates and non-sorted
aggregates at the same time
This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new f992e6c [CALCITE-2785] In EnumerableAggregate, wrong result produced if there are sorted aggregates and non-sorted aggregates at the same time
f992e6c is described below
commit f992e6c8fbc3712c6bd70096fb6b05da448b2cab
Author: hongzezhang <ho...@tencent.com>
AuthorDate: Thu Feb 28 11:38:42 2019 +0800
[CALCITE-2785] In EnumerableAggregate, wrong result produced if there are sorted aggregates and non-sorted aggregates at the same time
Rename SequencedAdderAggregateLambdaFactory to BasicAggregateLambdaFactory;
Rename OrderedAggregateLambdaFactory to LazyAggregateLambdaFactory;
Add BasicLazyAccumulator.
---
...ctory.java => BasicAggregateLambdaFactory.java} | 6 +--
.../adapter/enumerable/BasicLazyAccumulator.java | 45 ++++++++++++++++++++++
.../adapter/enumerable/EnumerableAggregate.java | 18 ++++++---
...actory.java => LazyAggregateLambdaFactory.java} | 40 +++++++++++--------
.../calcite/adapter/enumerable/SourceSorter.java | 17 +++++---
.../org/apache/calcite/util/BuiltInMethod.java | 10 +++--
core/src/test/resources/sql/agg.iq | 21 ++++++++++
7 files changed, 124 insertions(+), 33 deletions(-)
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicAggregateLambdaFactory.java
similarity index 94%
rename from core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java
rename to core/src/main/java/org/apache/calcite/adapter/enumerable/BasicAggregateLambdaFactory.java
index 4b4577c..ee67523 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/SequencedAdderAggregateLambdaFactory.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicAggregateLambdaFactory.java
@@ -31,13 +31,13 @@ import java.util.List;
* @param <TResult> Type of the enumerable output result
* @param <TKey> Type of the group-by key
*/
-public class SequencedAdderAggregateLambdaFactory<TSource, TAccumulate, TResult, TKey>
+public class BasicAggregateLambdaFactory<TSource, TAccumulate, TResult, TKey>
implements AggregateLambdaFactory<TSource, TAccumulate, TAccumulate, TResult, TKey> {
private final Function0<TAccumulate> accumulatorInitializer;
private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdderDecorator;
- public SequencedAdderAggregateLambdaFactory(
+ public BasicAggregateLambdaFactory(
Function0<TAccumulate> accumulatorInitializer,
List<Function2<TAccumulate, TSource, TAccumulate>> accumulatorAdders) {
this.accumulatorInitializer = accumulatorInitializer;
@@ -85,4 +85,4 @@ public class SequencedAdderAggregateLambdaFactory<TSource, TAccumulate, TResult,
}
}
-// End SequencedAdderAggregateLambdaFactory.java
+// End BasicAggregateLambdaFactory.java
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicLazyAccumulator.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicLazyAccumulator.java
new file mode 100644
index 0000000..4374cca
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/BasicLazyAccumulator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.linq4j.function.Function2;
+
+/**
+ * Performs accumulation against a pre-collected list of input sources,
+ * used with {@link LazyAggregateLambdaFactory}.
+ *
+ * @param <TAccumulate> Type of the accumulator
+ * @param <TSource> Type of the enumerable input source
+ */
+public class BasicLazyAccumulator<TAccumulate, TSource>
+ implements LazyAggregateLambdaFactory.LazyAccumulator<TAccumulate, TSource> {
+
+ private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+
+ public BasicLazyAccumulator(Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder) {
+ this.accumulatorAdder = accumulatorAdder;
+ }
+
+ @Override public void accumulate(Iterable<TSource> sourceIterable, TAccumulate accumulator) {
+ TAccumulate accumulator1 = accumulator;
+ for (TSource tSource : sourceIterable) {
+ accumulator1 = accumulatorAdder.apply(accumulator1, tSource);
+ }
+ }
+}
+
+// End BasicLazyAccumulator.java
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
index 3130bd0..aa33685 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableAggregate.java
@@ -485,11 +485,11 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
* <ul>
*
* <li>{@code hasOrderedCall == true} means there is at least one aggregate
- * call including sort spec. We use {@link OrderedAggregateLambdaFactory}
+ * call including sort spec. We use {@link LazyAggregateLambdaFactory}
* implementation to implement sorted aggregates for that.
*
* <li>{@code hasOrderedCall == false} indicates to use
- * {@link SequencedAdderAggregateLambdaFactory} to implement a non-sort
+ * {@link BasicAggregateLambdaFactory} to implement a non-sort
* aggregate.
*
* </ul>
@@ -502,12 +502,20 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
ParameterExpression lambdaFactory) {
if (hasOrderedCall) {
ParameterExpression pe = Expressions.parameter(List.class,
- builder.newName("sourceSorters"));
+ builder.newName("lazyAccumulators"));
builder.add(
Expressions.declare(0, pe, Expressions.new_(LinkedList.class)));
for (AggImpState agg : aggs) {
if (agg.call.collation.equals(RelCollations.EMPTY)) {
+ // if the call does not require ordering, fallback to
+ // use a non-sorted lazy accumulator.
+ builder.add(
+ Expressions.statement(
+ Expressions.call(pe,
+ BuiltInMethod.COLLECTION_ADD.method,
+ Expressions.new_(BuiltInMethod.BASIC_LAZY_ACCUMULATOR.constructor,
+ agg.accumulatorAdder))));
continue;
}
final Pair<Expression, Expression> pair =
@@ -523,7 +531,7 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
builder.add(
Expressions.declare(0, lambdaFactory,
Expressions.new_(
- BuiltInMethod.ORDERED_AGGREGATE_LAMBDA_FACTORY.constructor,
+ BuiltInMethod.LAZY_AGGREGATE_LAMBDA_FACTORY.constructor,
accumulatorInitializer, pe)));
} else {
// when hasOrderedCall == false
@@ -541,7 +549,7 @@ public class EnumerableAggregate extends Aggregate implements EnumerableRel {
builder.add(
Expressions.declare(0, lambdaFactory,
Expressions.new_(
- BuiltInMethod.SEQUENCED_ADDER_AGGREGATE_LAMBDA_FACTORY.constructor,
+ BuiltInMethod.BASIC_AGGREGATE_LAMBDA_FACTORY.constructor,
accumulatorInitializer, pe)));
}
}
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/LazyAggregateLambdaFactory.java
similarity index 69%
rename from core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java
rename to core/src/main/java/org/apache/calcite/adapter/enumerable/LazyAggregateLambdaFactory.java
index 3c7c72a..aa11ca1 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/OrderedAggregateLambdaFactory.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/LazyAggregateLambdaFactory.java
@@ -25,28 +25,27 @@ import java.util.Iterator;
import java.util.List;
/**
- * Generate aggregate lambdas that sorts the input source before calling each
- * aggregate adder.
+ * Generate aggregate lambdas that preserve the input source before calling each
+ * aggregate adder, this implementation is generally used when we need to sort the input
+ * before performing aggregation.
*
* @param <TSource> Type of the enumerable input source
* @param <TKey> Type of the group-by key
- * @param <TSortKey> Type of the sort key
* @param <TOrigAccumulate> Type of the original accumulator
* @param <TResult> Type of the enumerable output result
*/
-public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
- TOrigAccumulate, TResult>
+public class LazyAggregateLambdaFactory<TSource, TKey, TOrigAccumulate, TResult>
implements AggregateLambdaFactory<TSource, TOrigAccumulate,
- OrderedAggregateLambdaFactory.LazySource<TSource>, TResult, TKey> {
+ LazyAggregateLambdaFactory.LazySource<TSource>, TResult, TKey> {
private final Function0<TOrigAccumulate> accumulatorInitializer;
- private final List<SourceSorter<TOrigAccumulate, TSource, TSortKey>> sourceSorters;
+ private final List<LazyAccumulator<TOrigAccumulate, TSource>> accumulators;
- public OrderedAggregateLambdaFactory(
+ public LazyAggregateLambdaFactory(
Function0<TOrigAccumulate> accumulatorInitializer,
- List<SourceSorter<TOrigAccumulate, TSource, TSortKey>> sourceSorters) {
+ List<LazyAccumulator<TOrigAccumulate, TSource>> accumulators) {
this.accumulatorInitializer = accumulatorInitializer;
- this.sourceSorters = sourceSorters;
+ this.accumulators = accumulators;
}
public Function0<LazySource<TSource>> accumulatorInitializer() {
@@ -65,8 +64,8 @@ public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
Function1<TOrigAccumulate, TResult> resultSelector) {
return lazySource -> {
final TOrigAccumulate accumulator = accumulatorInitializer.apply();
- for (SourceSorter<TOrigAccumulate, TSource, TSortKey> acc : sourceSorters) {
- acc.sortAndAccumulate(lazySource, accumulator);
+ for (LazyAccumulator<TOrigAccumulate, TSource> acc : accumulators) {
+ acc.accumulate(lazySource, accumulator);
}
return resultSelector.apply(accumulator);
};
@@ -76,15 +75,15 @@ public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
Function2<TKey, TOrigAccumulate, TResult> resultSelector) {
return (groupByKey, lazySource) -> {
final TOrigAccumulate accumulator = accumulatorInitializer.apply();
- for (SourceSorter<TOrigAccumulate, TSource, TSortKey> acc : sourceSorters) {
- acc.sortAndAccumulate(lazySource, accumulator);
+ for (LazyAccumulator<TOrigAccumulate, TSource> acc : accumulators) {
+ acc.accumulate(lazySource, accumulator);
}
return resultSelector.apply(groupByKey, accumulator);
};
}
/**
- * Cache the input sources. (Will be sorted, aggregated in result selector.)
+ * Cache the input sources. (Will be aggregated in result selector.)
*
* @param <TSource> Type of the enumerable input source.
*/
@@ -100,6 +99,15 @@ public class OrderedAggregateLambdaFactory<TSource, TKey, TSortKey,
}
}
+ /**
+ * Accumulate on the cached input sources.
+ *
+ * @param <TOrigAccumulate> Type of the original accumulator
+ * @param <TSource> Type of the enumerable input source.
+ */
+ public interface LazyAccumulator<TOrigAccumulate, TSource> {
+ void accumulate(Iterable<TSource> sourceIterable, TOrigAccumulate accumulator);
+ }
}
-// End OrderedAggregateLambdaFactory.java
+// End LazyAggregateLambdaFactory.java
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
index 9345da4..c91bb1a 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/SourceSorter.java
@@ -25,13 +25,15 @@ import java.util.List;
/**
* Helper that combines the sorting process and accumulating process against the
- * aggregate execution, used with {@link OrderedAggregateLambdaFactory}.
+ * aggregate execution, used with {@link LazyAggregateLambdaFactory}.
*
* @param <TAccumulate> Type of the accumulator
- * @param <TSource> Type of the enumerable input source
- * @param <TSortKey> Type of the sort key
+ * @param <TSource> Type of the enumerable input source
+ * @param <TSortKey> Type of the sort key
*/
-public class SourceSorter<TAccumulate, TSource, TSortKey> {
+public class SourceSorter<TAccumulate, TSource, TSortKey>
+ implements LazyAggregateLambdaFactory.LazyAccumulator<TAccumulate, TSource> {
+
private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
private final Function1<TSource, TSortKey> keySelector;
private final Comparator<TSortKey> comparator;
@@ -45,7 +47,12 @@ public class SourceSorter<TAccumulate, TSource, TSortKey> {
this.comparator = comparator;
}
- public void sortAndAccumulate(Iterable<TSource> sourceIterable,
+ @Override public void accumulate(Iterable<TSource> sourceIterable,
+ TAccumulate accumulator) {
+ sortAndAccumulate(sourceIterable, accumulator);
+ }
+
+ private void sortAndAccumulate(Iterable<TSource> sourceIterable,
TAccumulate accumulator) {
List<TSource> sorted = Linq4j.asEnumerable(sourceIterable)
.orderBy(keySelector, comparator)
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index fc7cb60..3169b55 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -18,8 +18,9 @@ package org.apache.calcite.util;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.AggregateLambdaFactory;
-import org.apache.calcite.adapter.enumerable.OrderedAggregateLambdaFactory;
-import org.apache.calcite.adapter.enumerable.SequencedAdderAggregateLambdaFactory;
+import org.apache.calcite.adapter.enumerable.BasicAggregateLambdaFactory;
+import org.apache.calcite.adapter.enumerable.BasicLazyAccumulator;
+import org.apache.calcite.adapter.enumerable.LazyAggregateLambdaFactory;
import org.apache.calcite.adapter.enumerable.SourceSorter;
import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
@@ -485,9 +486,10 @@ public enum BuiltInMethod {
String.class),
SOURCE_SORTER(SourceSorter.class, Function2.class, Function1.class,
Comparator.class),
- ORDERED_AGGREGATE_LAMBDA_FACTORY(OrderedAggregateLambdaFactory.class,
+ BASIC_LAZY_ACCUMULATOR(BasicLazyAccumulator.class, Function2.class),
+ LAZY_AGGREGATE_LAMBDA_FACTORY(LazyAggregateLambdaFactory.class,
Function0.class, List.class),
- SEQUENCED_ADDER_AGGREGATE_LAMBDA_FACTORY(SequencedAdderAggregateLambdaFactory.class,
+ BASIC_AGGREGATE_LAMBDA_FACTORY(BasicAggregateLambdaFactory.class,
Function0.class, List.class),
AGG_LAMBDA_FACTORY_ACC_INITIALIZER(AggregateLambdaFactory.class,
"accumulatorInitializer"),
diff --git a/core/src/test/resources/sql/agg.iq b/core/src/test/resources/sql/agg.iq
index 83dcbca..41c1e95 100644
--- a/core/src/test/resources/sql/agg.iq
+++ b/core/src/test/resources/sql/agg.iq
@@ -2418,6 +2418,27 @@ EnumerableAggregate(group=[{7}], EMPNOS=[COLLECT($0) WITHIN GROUP ([0 DESC])])
EnumerableTableScan(table=[[scott, EMP]])
!plan
+select
+deptno,
+collect(empno) as empnos_1,
+collect(empno) within group (order by empno desc) as empnos_2
+from "scott".emp
+group by deptno;
+
++--------+--------------------------------------+--------------------------------------+
+| DEPTNO | EMPNOS_1 | EMPNOS_2 |
++--------+--------------------------------------+--------------------------------------+
+| 10 | [7782, 7839, 7934] | [7934, 7839, 7782] |
+| 20 | [7369, 7566, 7788, 7876, 7902] | [7902, 7876, 7788, 7566, 7369] |
+| 30 | [7499, 7521, 7654, 7698, 7844, 7900] | [7900, 7844, 7698, 7654, 7521, 7499] |
++--------+--------------------------------------+--------------------------------------+
+(3 rows)
+
+!ok
+EnumerableAggregate(group=[{7}], EMPNOS_1=[COLLECT($0)], EMPNOS_2=[COLLECT($0) WITHIN GROUP ([0 DESC])])
+ EnumerableTableScan(table=[[scott, EMP]])
+!plan
+
select deptno, collect(empno) within group (order by empno desc)
filter (where empno > 7500) as empnos
from "scott".emp