You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by ru...@apache.org on 2021/02/10 08:39:10 UTC
[calcite] branch master updated: [CALCITE-3221] Add MergeUnion
operator in Enumerable convention
This is an automated email from the ASF dual-hosted git repository.
rubenql pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 4cd90f3 [CALCITE-3221] Add MergeUnion operator in Enumerable convention
4cd90f3 is described below
commit 4cd90f36c3cf9a012e34f14129a907a4ce99c6f5
Author: rubenada <ru...@gmail.com>
AuthorDate: Mon Jan 18 16:30:07 2021 +0000
[CALCITE-3221] Add MergeUnion operator in Enumerable convention
---
.../adapter/enumerable/EnumerableMergeUnion.java | 118 +++++
.../enumerable/EnumerableMergeUnionRule.java | 105 +++++
.../adapter/enumerable/EnumerableRules.java | 6 +
.../calcite/rel/metadata/RelMdCollation.java | 12 +
.../java/org/apache/calcite/tools/Programs.java | 1 +
.../org/apache/calcite/util/BuiltInMethod.java | 2 +
.../apache/calcite/runtime/EnumerablesTest.java | 515 +++++++++++++++++++++
.../test/enumerable/EnumerableMergeUnionTest.java | 309 +++++++++++++
.../apache/calcite/linq4j/EnumerableDefaults.java | 37 +-
.../calcite/linq4j/MergeUnionEnumerator.java | 208 +++++++++
10 files changed, 1311 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeUnion.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeUnion.java
new file mode 100644
index 0000000..182e0a5
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeUnion.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link org.apache.calcite.rel.core.Union} in
+ * {@link org.apache.calcite.adapter.enumerable.EnumerableConvention enumerable calling convention}.
+ * Performs a union (or union all) of all its inputs (which must be already sorted),
+ * respecting the order. */
+public class EnumerableMergeUnion extends EnumerableUnion {
+
+ protected EnumerableMergeUnion(RelOptCluster cluster, RelTraitSet traitSet, List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traitSet, inputs, all);
+ final RelCollation collation = traitSet.getCollation();
+ if (collation == null || collation.getFieldCollations().isEmpty()) {
+ throw new IllegalArgumentException("EnumerableMergeUnion with no collation");
+ }
+ for (RelNode input : inputs) {
+ final RelCollation inputCollation = input.getTraitSet().getCollation();
+ if (inputCollation == null || !inputCollation.satisfies(collation)) {
+ throw new IllegalArgumentException("EnumerableMergeUnion input does not satisfy collation. "
+ + "EnumerableMergeUnion collation: " + collation + ". Input collation: "
+ + inputCollation + ". Input: " + input);
+ }
+ }
+ }
+
+ public static EnumerableMergeUnion create(RelCollation collation, List<RelNode> inputs,
+ boolean all) {
+ final RelOptCluster cluster = inputs.get(0).getCluster();
+ final RelTraitSet traitSet = cluster.traitSetOf(EnumerableConvention.INSTANCE).replace(
+ collation);
+ return new EnumerableMergeUnion(cluster, traitSet, inputs, all);
+ }
+
+ @Override public EnumerableMergeUnion copy(RelTraitSet traitSet, List<RelNode> inputs,
+ boolean all) {
+ return new EnumerableMergeUnion(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+ final BlockBuilder builder = new BlockBuilder();
+
+ final ParameterExpression inputListExp = Expressions.parameter(
+ List.class,
+ builder.newName("mergeUnionInputs" + getId()));
+ builder.add(Expressions.declare(0, inputListExp, Expressions.new_(ArrayList.class)));
+
+ for (Ord<RelNode> ord : Ord.zip(inputs)) {
+ final EnumerableRel input = (EnumerableRel) ord.e;
+ final Result result = implementor.visitChild(this, ord.i, input, pref);
+ final Expression childExp = builder.append("child" + ord.i, result.block);
+ builder.add(
+ Expressions.statement(
+ Expressions.call(inputListExp, BuiltInMethod.COLLECTION_ADD.method, childExp)));
+ }
+
+ final PhysType physType = PhysTypeImpl.of(
+ implementor.getTypeFactory(),
+ getRowType(),
+ pref.prefer(JavaRowFormat.CUSTOM));
+
+ final RelCollation collation = getTraitSet().getCollation();
+ if (collation == null || collation.getFieldCollations().isEmpty()) {
+ // should not happen
+ throw new IllegalStateException("EnumerableMergeUnion with no collation");
+ }
+ final Pair<Expression, Expression> pair =
+ physType.generateCollationKey(collation.getFieldCollations());
+ final Expression sortKeySelector = pair.left;
+ final Expression sortComparator = pair.right;
+
+ final Expression equalityComparator = Util.first(
+ physType.comparer(),
+ Expressions.call(BuiltInMethod.IDENTITY_COMPARER.method));
+
+ final Expression unionExp = Expressions.call(
+ BuiltInMethod.MERGE_UNION.method,
+ inputListExp,
+ sortKeySelector,
+ sortComparator,
+ Expressions.constant(all, boolean.class),
+ equalityComparator);
+ builder.add(unionExp);
+
+ return implementor.result(physType, builder.toBlock());
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeUnionRule.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeUnionRule.java
new file mode 100644
index 0000000..be19d1f
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeUnionRule.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalSort} on top of a
+ * {@link org.apache.calcite.rel.logical.LogicalUnion} into a {@link EnumerableMergeUnion}.
+ *
+ * @see EnumerableRules#ENUMERABLE_MERGE_UNION_RULE
+ */
+public class EnumerableMergeUnionRule extends RelRule<EnumerableMergeUnionRule.Config> {
+
+ /** Rule configuration. */
+ public interface Config extends RelRule.Config {
+ Config DEFAULT_CONFIG = EMPTY.withDescription("EnumerableMergeUnionRule").withOperandSupplier(
+ b0 -> b0.operand(LogicalSort.class).oneInput(
+ b1 -> b1.operand(LogicalUnion.class).anyInputs())).as(Config.class);
+
+ @Override default EnumerableMergeUnionRule toRule() {
+ return new EnumerableMergeUnionRule(this);
+ }
+ }
+
+ public EnumerableMergeUnionRule(Config config) {
+ super(config);
+ }
+
+ @Override public boolean matches(RelOptRuleCall call) {
+ final Sort sort = call.rel(0);
+ final RelCollation collation = sort.getCollation();
+ if (collation == null || collation.getFieldCollations().isEmpty()) {
+ return false;
+ }
+
+ final Union union = call.rel(1);
+ if (union.getInputs().size() < 2) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Sort sort = call.rel(0);
+ final RelCollation collation = sort.getCollation();
+ final Union union = call.rel(1);
+ final int unionInputsSize = union.getInputs().size();
+
+ // Push down sort limit, if possible.
+ RexNode inputFetch = null;
+ if (sort.fetch != null) {
+ if (sort.offset == null) {
+ inputFetch = sort.fetch;
+ } else if (sort.fetch instanceof RexLiteral && sort.offset instanceof RexLiteral) {
+ inputFetch = call.builder().literal(
+ RexLiteral.intValue(sort.fetch) + RexLiteral.intValue(sort.offset));
+ }
+ }
+
+ final List<RelNode> inputs = new ArrayList<>(unionInputsSize);
+ for (RelNode input : union.getInputs()) {
+ final RelNode newInput = sort.copy(sort.getTraitSet(), input, collation, null, inputFetch);
+ inputs.add(
+ convert(newInput, newInput.getTraitSet().replace(EnumerableConvention.INSTANCE)));
+ }
+
+ RelNode result = EnumerableMergeUnion.create(sort.getCollation(), inputs, union.all);
+
+ // If Sort contained a LIMIT / OFFSET, then put it back as an EnumerableLimit.
+ // The output of the MergeUnion is already sorted, so we do not need a sort anymore.
+ if (sort.offset != null || sort.fetch != null) {
+ result = EnumerableLimit.create(result, sort.offset, sort.fetch);
+ }
+
+ call.transformTo(result);
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
index 8816923..17a7465 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
@@ -103,6 +103,11 @@ public class EnumerableRules {
EnumerableRepeatUnionRule.DEFAULT_CONFIG
.toRule(EnumerableRepeatUnionRule.class);
+ /** Rule to convert a {@link org.apache.calcite.rel.logical.LogicalSort} on top of a
+ * {@link org.apache.calcite.rel.logical.LogicalUnion} into a {@link EnumerableMergeUnion}. */
+ public static final EnumerableMergeUnionRule ENUMERABLE_MERGE_UNION_RULE =
+ EnumerableMergeUnionRule.Config.DEFAULT_CONFIG.toRule();
+
/** Rule that converts a {@link LogicalTableSpool} into an
* {@link EnumerableTableSpool}. */
@Experimental
@@ -210,6 +215,7 @@ public class EnumerableRules {
EnumerableRules.ENUMERABLE_LIMIT_RULE,
EnumerableRules.ENUMERABLE_COLLECT_RULE,
EnumerableRules.ENUMERABLE_UNCOLLECT_RULE,
+ EnumerableRules.ENUMERABLE_MERGE_UNION_RULE,
EnumerableRules.ENUMERABLE_UNION_RULE,
EnumerableRules.ENUMERABLE_REPEAT_UNION_RULE,
EnumerableRules.ENUMERABLE_TABLE_SPOOL_RULE,
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
index 1fc2336..850f99a 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
@@ -19,6 +19,7 @@ package org.apache.calcite.rel.metadata;
import org.apache.calcite.adapter.enumerable.EnumerableCorrelate;
import org.apache.calcite.adapter.enumerable.EnumerableHashJoin;
import org.apache.calcite.adapter.enumerable.EnumerableMergeJoin;
+import org.apache.calcite.adapter.enumerable.EnumerableMergeUnion;
import org.apache.calcite.adapter.enumerable.EnumerableNestedLoopJoin;
import org.apache.calcite.adapter.jdbc.JdbcToEnumerableConverter;
import org.apache.calcite.linq4j.Ord;
@@ -175,6 +176,17 @@ public class RelMdCollation
join.getJoinType()));
}
+ public @Nullable ImmutableList<RelCollation> collations(EnumerableMergeUnion mergeUnion,
+ RelMetadataQuery mq) {
+ final RelCollation collation = mergeUnion.getTraitSet().getCollation();
+ if (collation == null) {
+ // should not happen
+ return null;
+ }
+ // MergeUnion guarantees order, like a sort
+ return copyOf(RelMdCollation.sort(collation));
+ }
+
public @Nullable ImmutableList<RelCollation> collations(EnumerableCorrelate join,
RelMetadataQuery mq) {
return copyOf(
diff --git a/core/src/main/java/org/apache/calcite/tools/Programs.java b/core/src/main/java/org/apache/calcite/tools/Programs.java
index c4f1429..b973c5c 100644
--- a/core/src/main/java/org/apache/calcite/tools/Programs.java
+++ b/core/src/main/java/org/apache/calcite/tools/Programs.java
@@ -81,6 +81,7 @@ public class Programs {
EnumerableRules.ENUMERABLE_SORT_RULE,
EnumerableRules.ENUMERABLE_LIMIT_RULE,
EnumerableRules.ENUMERABLE_UNION_RULE,
+ EnumerableRules.ENUMERABLE_MERGE_UNION_RULE,
EnumerableRules.ENUMERABLE_INTERSECT_RULE,
EnumerableRules.ENUMERABLE_MINUS_RULE,
EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE,
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 84008a6..69a632d 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -239,6 +239,8 @@ public enum BuiltInMethod {
CONCAT(ExtendedEnumerable.class, "concat", Enumerable.class),
REPEAT_UNION(EnumerableDefaults.class, "repeatUnion", Enumerable.class,
Enumerable.class, int.class, boolean.class, EqualityComparer.class),
+ MERGE_UNION(EnumerableDefaults.class, "mergeUnion", List.class, Function1.class,
+ Comparator.class, boolean.class, EqualityComparer.class),
LAZY_COLLECTION_SPOOL(EnumerableDefaults.class, "lazyCollectionSpool", Collection.class,
Enumerable.class),
INTERSECT(ExtendedEnumerable.class, "intersect", Enumerable.class, boolean.class),
diff --git a/core/src/test/java/org/apache/calcite/runtime/EnumerablesTest.java b/core/src/test/java/org/apache/calcite/runtime/EnumerablesTest.java
index f615d92..80d1a51 100644
--- a/core/src/test/java/org/apache/calcite/runtime/EnumerablesTest.java
+++ b/core/src/test/java/org/apache/calcite/runtime/EnumerablesTest.java
@@ -20,6 +20,7 @@ import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.EnumerableDefaults;
import org.apache.calcite.linq4j.JoinType;
import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.EqualityComparer;
import org.apache.calcite.linq4j.function.Function2;
import org.apache.calcite.linq4j.function.Functions;
import org.apache.calcite.linq4j.function.Predicate2;
@@ -31,8 +32,10 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Locale;
+import java.util.Objects;
import static com.google.common.collect.Lists.newArrayList;
@@ -966,6 +969,503 @@ class EnumerablesTest {
+ " null, Dept(30, Development)]"));
}
+ @Test void testMergeUnionAllEmptyOnRight() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Lilly"),
+ new Emp(30, "Joe"),
+ new Emp(30, "Greg"))),
+ Linq4j.emptyEnumerable()),
+ e -> e.deptno,
+ INTEGER_ASC,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo("[Emp(20, Lilly), Emp(30, Joe), Emp(30, Greg)]"));
+ }
+
+ @Test void testMergeUnionAllEmptyOnLeft() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.emptyEnumerable(),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Lilly"),
+ new Emp(30, "Joe"),
+ new Emp(30, "Greg")))),
+ e -> e.deptno,
+ INTEGER_ASC,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo("[Emp(20, Lilly), Emp(30, Joe), Emp(30, Greg)]"));
+ }
+
+ @Test void testMergeUnionAllEmptyOnBoth() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.emptyEnumerable(),
+ Linq4j.emptyEnumerable()),
+ e -> e.deptno,
+ INTEGER_ASC,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo("[]"));
+ }
+
+ @Test void testMergeUnionAllOrderByDeptAsc2inputs() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Lilly"),
+ new Emp(30, "Joe"),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(10, "Fred"),
+ new Emp(30, "Theodore"),
+ new Emp(40, "Sebastian")))),
+ e -> e.deptno,
+ INTEGER_ASC,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(10, Fred), Emp(20, Lilly), Emp(30, Joe), Emp(30, Greg), Emp(30, Theodore), Emp(40, Sebastian)]"));
+ }
+
+ @Test void testMergeUnionAllOrderByDeptAsc3inputs() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Lilly"),
+ new Emp(30, "Joe"),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(15, "Phyllis"),
+ new Emp(18, "Maddie"),
+ new Emp(22, "Jenny"),
+ new Emp(42, "Susan"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(10, "Fred"),
+ new Emp(30, "Joe"),
+ new Emp(40, "Sebastian")))),
+ e -> e.deptno,
+ INTEGER_ASC,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(10, Fred), Emp(15, Phyllis), Emp(18, Maddie), Emp(20, Lilly), Emp(22, Jenny),"
+ + " Emp(30, Joe), Emp(30, Greg), Emp(30, Joe), Emp(40, Sebastian), Emp(42, Susan)]"));
+ }
+
+ @Test void testMergeUnionOrderByDeptAsc3inputs() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(15, "Phyllis"),
+ new Emp(15, "Phyllis"),
+ new Emp(20, "Lilly"),
+ new Emp(30, "Joe"),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(15, "Phyllis"),
+ new Emp(18, "Maddie"),
+ new Emp(22, "Jenny"),
+ new Emp(30, "Joe"),
+ new Emp(42, "Susan"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(10, "Fred"),
+ new Emp(15, "Phyllis"),
+ new Emp(30, "Joe"),
+ new Emp(30, "Joe"),
+ new Emp(40, "Sebastian")))),
+ e -> e.deptno,
+ INTEGER_ASC,
+ false,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(10, Fred), Emp(15, Phyllis), Emp(18, Maddie), Emp(20, Lilly), Emp(22, Jenny),"
+ + " Emp(30, Joe), Emp(30, Greg), Emp(40, Sebastian), Emp(42, Susan)]"));
+ }
+
+ @Test void testMergeUnionAllOrderByDeptDesc2inputs() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(42, "Lilly"),
+ new Emp(30, "Joe"),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(50, "Fred"),
+ new Emp(30, "Theodore"),
+ new Emp(10, "Sebastian")))),
+ e -> e.deptno,
+ INTEGER_DESC,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(50, Fred), Emp(42, Lilly), Emp(30, Joe), Emp(30, Greg), Emp(30, Theodore), Emp(10, Sebastian)]"));
+ }
+
+ @Test void testMergeUnionAllOrderByDeptDesc3inputs() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(35, "Lilly"),
+ new Emp(22, "Jenny"),
+ new Emp(20, "Joe"),
+ new Emp(20, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(45, "Phyllis"),
+ new Emp(42, "Maddie"),
+ new Emp(22, "Jenny"),
+ new Emp(22, "Jenny"),
+ new Emp(12, "Susan"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(50, "Fred"),
+ new Emp(20, "Theodore"),
+ new Emp(15, "Sebastian")))),
+ e -> e.deptno,
+ INTEGER_DESC,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(50, Fred), Emp(45, Phyllis), Emp(42, Maddie), Emp(35, Lilly), Emp(22, Jenny),"
+ + " Emp(22, Jenny), Emp(22, Jenny), Emp(20, Joe), Emp(20, Greg), Emp(20, Theodore), Emp(15, Sebastian), Emp(12, Susan)]"));
+ }
+
+ @Test void testMergeUnionOrderByDeptDesc3inputs() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(35, "Lilly"),
+ new Emp(22, "Jenny"),
+ new Emp(22, "Jenny"),
+ new Emp(20, "Joe"),
+ new Emp(20, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(45, "Phyllis"),
+ new Emp(42, "Maddie"),
+ new Emp(22, "Jenny"),
+ new Emp(12, "Susan"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(50, "Fred"),
+ new Emp(22, "Jenny"),
+ new Emp(20, "Theodore"),
+ new Emp(20, "Joe"),
+ new Emp(15, "Sebastian")))),
+ e -> e.deptno,
+ INTEGER_DESC,
+ false,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(50, Fred), Emp(45, Phyllis), Emp(42, Maddie), Emp(35, Lilly), Emp(22, Jenny),"
+ + " Emp(20, Joe), Emp(20, Greg), Emp(20, Theodore), Emp(15, Sebastian), Emp(12, Susan)]"));
+ }
+
+ @Test void testMergeUnionAllOrderByNameAscNullsFirst() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(10, null),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(30, "Sebastian"),
+ new Emp(10, "Theodore")))),
+ e -> e.name,
+ STRING_ASC_NULLS_FIRST,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(20, null), Emp(10, null), Emp(20, null), Emp(30, Greg), Emp(30, Sebastian), Emp(10, Theodore)]"));
+ }
+
+ @Test void testMergeUnionOrderByNameAscNullsFirst() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(10, null),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(30, "Sebastian"),
+ new Emp(10, "Theodore")))),
+ e -> e.name,
+ STRING_ASC_NULLS_FIRST,
+ false,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(20, null), Emp(10, null), Emp(30, Greg), Emp(30, Sebastian), Emp(10, Theodore)]"));
+ }
+
+ @Test void testMergeUnionAllOrderByNameDescNullsFirst() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(10, null),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(30, "Theodore"),
+ new Emp(10, "Sebastian")))),
+ e -> e.name,
+ STRING_DESC_NULLS_FIRST,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(20, null), Emp(10, null), Emp(20, null), Emp(30, Theodore), Emp(10, Sebastian), Emp(30, Greg)]"));
+ }
+
+ @Test void testMergeUnionOrderByNameDescNullsFirst() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(10, null),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(30, "Theodore"),
+ new Emp(10, "Sebastian")))),
+ e -> e.name,
+ STRING_DESC_NULLS_FIRST,
+ false,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(20, null), Emp(10, null), Emp(30, Theodore), Emp(10, Sebastian), Emp(30, Greg)]"));
+ }
+
+ @Test void testMergeUnionAllOrderByNameAscNullsLast() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Greg"),
+ new Emp(10, null),
+ new Emp(30, null))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Greg"),
+ new Emp(30, "Sebastian"),
+ new Emp(30, "Theodore"),
+ new Emp(10, null)))),
+ e -> e.name,
+ STRING_ASC_NULLS_LAST,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(20, Greg), Emp(20, Greg), Emp(30, Sebastian), Emp(30, Theodore), Emp(10, null), Emp(30, null), Emp(10, null)]"));
+ }
+
+ @Test void testMergeUnionOrderByNameAscNullsLast() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Greg"),
+ new Emp(10, null),
+ new Emp(30, null))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Greg"),
+ new Emp(30, "Sebastian"),
+ new Emp(30, "Theodore"),
+ new Emp(10, null)))),
+ e -> e.name,
+ STRING_ASC_NULLS_LAST,
+ false,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(20, Greg), Emp(30, Sebastian), Emp(30, Theodore), Emp(10, null), Emp(30, null)]"));
+ }
+
+ @Test void testMergeUnionAllOrderByNameDescNullsLast() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Greg"),
+ new Emp(10, null),
+ new Emp(30, null))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(30, "Theodore"),
+ new Emp(30, "Sebastian"),
+ new Emp(20, "Greg"),
+ new Emp(10, null)))),
+ e -> e.name,
+ STRING_DESC_NULLS_LAST,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(30, Theodore), Emp(30, Sebastian), Emp(20, Greg), Emp(20, Greg), Emp(10, null), Emp(30, null), Emp(10, null)]"));
+ }
+
+ @Test void testMergeUnionOrderByNameDescNullsLast() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Greg"),
+ new Emp(10, null),
+ new Emp(30, null))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(30, "Theodore"),
+ new Emp(30, "Sebastian"),
+ new Emp(20, "Greg"),
+ new Emp(10, null)))),
+ e -> e.name,
+ STRING_DESC_NULLS_LAST,
+ false,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(30, Theodore), Emp(30, Sebastian), Emp(20, Greg), Emp(10, null), Emp(30, null)]"));
+ }
+
+ @Test void testMergeUnionAllOrderByDeptAscNameDescNullsFirst() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(new Emp(10, null))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Lilly"),
+ new Emp(20, "Lilly"),
+ new Emp(20, "Antoine"),
+ new Emp(22, null),
+ new Emp(30, "Joe"),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(20, "Annie"),
+ new Emp(22, "Jenny"),
+ new Emp(42, "Susan"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(new Emp(50, "Lolly"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(10, "Fred"),
+ new Emp(20, "Lilly"),
+ new Emp(22, null),
+ new Emp(30, "Joe"),
+ new Emp(40, "Sebastian")))),
+ e -> e,
+ DEPT_ASC_AND_NAME_DESC_NULLS_FIRST,
+ true,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(10, null), Emp(10, Fred), Emp(20, null), Emp(20, Lilly), Emp(20, Lilly), Emp(20, Lilly),"
+ + " Emp(20, Antoine), Emp(20, Annie), Emp(22, null), Emp(22, null), Emp(22, Jenny),"
+ + " Emp(30, Joe), Emp(30, Joe), Emp(30, Greg), Emp(40, Sebastian), Emp(42, Susan), Emp(50, Lolly)]"));
+ }
+
+ @Test void testMergeUnionOrderByDeptAscNameDescNullsFirst() {
+ assertThat(
+ EnumerableDefaults.mergeUnion(
+ Arrays.asList(
+ Linq4j.asEnumerable(
+ Arrays.asList(new Emp(10, null))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, "Lilly"),
+ new Emp(20, "Lilly"),
+ new Emp(20, "Antoine"),
+ new Emp(22, null),
+ new Emp(30, "Joe"),
+ new Emp(30, "Greg"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(20, null),
+ new Emp(20, "Annie"),
+ new Emp(22, "Jenny"),
+ new Emp(42, "Susan"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(new Emp(50, "Lolly"))),
+ Linq4j.asEnumerable(
+ Arrays.asList(
+ new Emp(10, "Fred"),
+ new Emp(20, "Lilly"),
+ new Emp(22, null),
+ new Emp(30, "Joe"),
+ new Emp(40, "Sebastian")))),
+ e -> e,
+ DEPT_ASC_AND_NAME_DESC_NULLS_FIRST,
+ false,
+ EMP_EQUALITY_COMPARER).toList().toString(),
+ equalTo(
+ "[Emp(10, null), Emp(10, Fred), Emp(20, null), Emp(20, Lilly),"
+ + " Emp(20, Antoine), Emp(20, Annie), Emp(22, null), Emp(22, Jenny),"
+ + " Emp(30, Joe), Emp(30, Greg), Emp(40, Sebastian), Emp(42, Susan), Emp(50, Lolly)]"));
+ }
+
+ private static final Comparator<Integer> INTEGER_ASC = Integer::compare;
+ private static final Comparator<Integer> INTEGER_DESC = INTEGER_ASC.reversed();
+
+ private static final Comparator<String> STRING_ASC = Comparator.naturalOrder();
+ private static final Comparator<String> STRING_DESC = STRING_ASC.reversed();
+
+ private static final Comparator<String> STRING_ASC_NULLS_FIRST =
+ Comparator.nullsFirst(STRING_ASC);
+ private static final Comparator<String> STRING_ASC_NULLS_LAST =
+ Comparator.nullsLast(STRING_ASC);
+ private static final Comparator<String> STRING_DESC_NULLS_FIRST =
+ Comparator.nullsFirst(STRING_DESC);
+ private static final Comparator<String> STRING_DESC_NULLS_LAST =
+ Comparator.nullsLast(STRING_DESC);
+
+ private static final Comparator<Emp> DEPT_ASC_AND_NAME_DESC_NULLS_FIRST =
+ Comparator.<Emp>comparingInt(emp -> emp.deptno)
+ .thenComparing(emp -> emp.name, STRING_DESC_NULLS_FIRST);
+
+ private static final EqualityComparer<Emp> EMP_EQUALITY_COMPARER = Functions.identityComparer();
+
/** Employee record. */
private static class Emp {
final int deptno;
@@ -976,6 +1476,21 @@ class EnumerablesTest {
this.name = name;
}
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || this.getClass() != o.getClass()) {
+ return false;
+ }
+ final Emp emp = (Emp) o;
+ return this.deptno == emp.deptno && Objects.equals(this.name, emp.name);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(this.deptno, this.name);
+ }
+
@Override public String toString() {
return "Emp(" + deptno + ", " + name + ")";
}
diff --git a/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableMergeUnionTest.java b/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableMergeUnionTest.java
new file mode 100644
index 0000000..483c0fc
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableMergeUnionTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+/**
+ * Unit test for
+ * {@link org.apache.calcite.adapter.enumerable.EnumerableMergeUnion}.
+ */
+class EnumerableMergeUnionTest {
+
+ @Test void mergeUnionAllOrderByEmpid() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select empid, name from emps where name like 'G%' union all select empid, name from emps where name like '%l') order by empid")
+ .explainContains("EnumerableMergeUnion(all=[true])\n"
+ + " EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['G%'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%l'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "empid=1; name=Bill",
+ "empid=6; name=Guy",
+ "empid=10; name=Gabriel",
+ "empid=10; name=Gabriel",
+ "empid=12; name=Paul",
+ "empid=29; name=Anibal",
+ "empid=40; name=Emmanuel",
+ "empid=45; name=Pascal");
+ }
+
+ @Test void mergeUnionOrderByEmpid() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select empid, name from emps where name like 'G%' union select empid, name from emps where name like '%l') order by empid")
+ .explainContains("EnumerableMergeUnion(all=[false])\n"
+ + " EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['G%'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%l'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "empid=1; name=Bill",
+ "empid=6; name=Guy",
+ "empid=10; name=Gabriel",
+ "empid=12; name=Paul",
+ "empid=29; name=Anibal",
+ "empid=40; name=Emmanuel",
+ "empid=45; name=Pascal");
+ }
+
+ @Test void mergeUnionAllOrderByName() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select empid, name from emps where name like 'G%' union all select empid, name from emps where name like '%l') order by name")
+ .explainContains("EnumerableMergeUnion(all=[true])\n"
+ + " EnumerableSort(sort0=[$1], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['G%'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$1], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%l'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "empid=29; name=Anibal",
+ "empid=1; name=Bill",
+ "empid=40; name=Emmanuel",
+ "empid=10; name=Gabriel",
+ "empid=10; name=Gabriel",
+ "empid=6; name=Guy",
+ "empid=45; name=Pascal",
+ "empid=12; name=Paul");
+ }
+
+ @Test void mergeUnionOrderByName() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select empid, name from emps where name like 'G%' union select empid, name from emps where name like '%l') order by name")
+ .explainContains("EnumerableMergeUnion(all=[false])\n"
+ + " EnumerableSort(sort0=[$1], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['G%'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$1], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%l'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "empid=29; name=Anibal",
+ "empid=1; name=Bill",
+ "empid=40; name=Emmanuel",
+ "empid=10; name=Gabriel",
+ "empid=6; name=Guy",
+ "empid=45; name=Pascal",
+ "empid=12; name=Paul");
+ }
+
+ @Test void mergeUnionSingleColumnOrderByName() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select name from emps where name like 'G%' union select name from emps where name like '%l') order by name")
+ .explainContains("EnumerableMergeUnion(all=[false])\n"
+ + " EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['G%'], expr#6=[LIKE($t2, $t5)], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%l'], expr#6=[LIKE($t2, $t5)], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "name=Anibal",
+ "name=Bill",
+ "name=Emmanuel",
+ "name=Gabriel",
+ "name=Guy",
+ "name=Pascal",
+ "name=Paul");
+ }
+
+ @Test void mergeUnionOrderByNameWithLimit() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select empid, name from emps where name like 'G%' union select empid, name from emps where name like '%l') order by name limit 3")
+ .explainContains("EnumerableLimit(fetch=[3])\n"
+ + " EnumerableMergeUnion(all=[false])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], empid=[$t0], name=[$t2])\n"
+ + " EnumerableLimitSort(sort0=[$2], dir0=[ASC], fetch=[3])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['G%'], expr#6=[LIKE($t2, $t5)], proj#0..4=[{exprs}], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], empid=[$t0], name=[$t2])\n"
+ + " EnumerableLimitSort(sort0=[$2], dir0=[ASC], fetch=[3])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%l'], expr#6=[LIKE($t2, $t5)], proj#0..4=[{exprs}], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "empid=29; name=Anibal",
+ "empid=1; name=Bill",
+ "empid=40; name=Emmanuel");
+ }
+
+ @Test void mergeUnionOrderByNameWithOffset() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select empid, name from emps where name like 'G%' union select empid, name from emps where name like '%l') order by name offset 2")
+ .explainContains("EnumerableLimit(offset=[2])\n"
+ + " EnumerableMergeUnion(all=[false])\n"
+ + " EnumerableSort(sort0=[$1], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['G%'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$1], dir0=[ASC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%l'], expr#6=[LIKE($t2, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "empid=40; name=Emmanuel",
+ "empid=10; name=Gabriel",
+ "empid=6; name=Guy",
+ "empid=45; name=Pascal",
+ "empid=12; name=Paul");
+ }
+
+ @Test void mergeUnionOrderByNameWithLimitAndOffset() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select empid, name from emps where name like 'G%' union select empid, name from emps where name like '%l') order by name limit 3 offset 2")
+ .explainContains("EnumerableLimit(offset=[2], fetch=[3])\n"
+ + " EnumerableMergeUnion(all=[false])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], empid=[$t0], name=[$t2])\n"
+ + " EnumerableLimitSort(sort0=[$2], dir0=[ASC], fetch=[5])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['G%'], expr#6=[LIKE($t2, $t5)], proj#0..4=[{exprs}], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], empid=[$t0], name=[$t2])\n"
+ + " EnumerableLimitSort(sort0=[$2], dir0=[ASC], fetch=[5])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%l'], expr#6=[LIKE($t2, $t5)], proj#0..4=[{exprs}], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "empid=40; name=Emmanuel",
+ "empid=10; name=Gabriel",
+ "empid=6; name=Guy");
+ }
+
+ @Test void mergeUnionAllOrderByCommissionAscNullsFirstAndNameDesc() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select commission, name from emps where name like 'R%' union all select commission, name from emps where name like '%y%') order by commission asc nulls first, name desc")
+ .explainContains("EnumerableMergeUnion(all=[true])\n"
+ + " EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['R%'], expr#6=[LIKE($t2, $t5)], commission=[$t4], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%y%'], expr#6=[LIKE($t2, $t5)], commission=[$t4], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "commission=null; name=Taylor",
+ "commission=null; name=Riyad",
+ "commission=null; name=Riyad",
+ "commission=null; name=Ralf",
+ "commission=250; name=Seohyun",
+ "commission=250; name=Hyuna",
+ "commission=250; name=Andy",
+ "commission=500; name=Kylie",
+ "commission=500; name=Guy");
+ }
+
+ @Test void mergeUnionOrderByCommissionAscNullsFirstAndNameDesc() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select commission, name from emps where name like 'R%' union select commission, name from emps where name like '%y%') order by commission asc nulls first, name desc")
+ .explainContains("EnumerableMergeUnion(all=[false])\n"
+ + " EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['R%'], expr#6=[LIKE($t2, $t5)], commission=[$t4], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC-nulls-first], dir1=[DESC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%y%'], expr#6=[LIKE($t2, $t5)], commission=[$t4], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "commission=null; name=Taylor",
+ "commission=null; name=Riyad",
+ "commission=null; name=Ralf",
+ "commission=250; name=Seohyun",
+ "commission=250; name=Hyuna",
+ "commission=250; name=Andy",
+ "commission=500; name=Kylie",
+ "commission=500; name=Guy");
+ }
+
+ @Test void mergeUnionAllOrderByCommissionAscNullsLastAndNameDesc() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select commission, name from emps where name like 'R%' union all select commission, name from emps where name like '%y%') order by commission asc nulls last, name desc")
+ .explainContains("EnumerableMergeUnion(all=[true])\n"
+ + " EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['R%'], expr#6=[LIKE($t2, $t5)], commission=[$t4], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%y%'], expr#6=[LIKE($t2, $t5)], commission=[$t4], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "commission=250; name=Seohyun",
+ "commission=250; name=Hyuna",
+ "commission=250; name=Andy",
+ "commission=500; name=Kylie",
+ "commission=500; name=Guy",
+ "commission=null; name=Taylor",
+ "commission=null; name=Riyad",
+ "commission=null; name=Riyad",
+ "commission=null; name=Ralf");
+ }
+
+ @Test void mergeUnionOrderByCommissionAscNullsLastAndNameDesc() {
+ tester(false,
+ new JdbcTest.HrSchemaBig(),
+ "select * from (select commission, name from emps where name like 'R%' union select commission, name from emps where name like '%y%') order by commission asc nulls last, name desc")
+ .explainContains("EnumerableMergeUnion(all=[false])\n"
+ + " EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['R%'], expr#6=[LIKE($t2, $t5)], commission=[$t4], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=['%y%'], expr#6=[LIKE($t2, $t5)], commission=[$t4], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsOrdered(
+ "commission=250; name=Seohyun",
+ "commission=250; name=Hyuna",
+ "commission=250; name=Andy",
+ "commission=500; name=Kylie",
+ "commission=500; name=Guy",
+ "commission=null; name=Taylor",
+ "commission=null; name=Riyad",
+ "commission=null; name=Ralf");
+ }
+
+ private CalciteAssert.AssertQuery tester(boolean forceDecorrelate,
+ Object schema, String sqlQuery) {
+ return CalciteAssert.that()
+ .with(CalciteConnectionProperty.LEX, Lex.JAVA)
+ .with(CalciteConnectionProperty.FORCE_DECORRELATE, forceDecorrelate)
+ .withSchema("s", new ReflectiveSchema(schema))
+ .query(sqlQuery)
+ .withHook(Hook.PLANNER, (Consumer<RelOptPlanner>) planner -> {
+ // Force UNION to be implemented via EnumerableMergeUnion
+ planner.removeRule(EnumerableRules.ENUMERABLE_UNION_RULE);
+ // Allow EnumerableLimitSort optimization
+ planner.addRule(EnumerableRules.ENUMERABLE_LIMIT_SORT_RULE);
+ });
+ }
+}
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
index 7a7ff63..4f8271d 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
@@ -3674,7 +3674,7 @@ public abstract class EnumerableDefaults {
return a0 -> a0.element;
}
- private static <TSource> Function1<TSource, Wrapped<TSource>> wrapperFor(
+ static <TSource> Function1<TSource, Wrapped<TSource>> wrapperFor(
final EqualityComparer<TSource> comparer) {
return a0 -> Wrapped.upAs(comparer, a0);
}
@@ -3997,7 +3997,7 @@ public abstract class EnumerableDefaults {
/** Value wrapped with a comparer.
*
* @param <T> element type */
- private static class Wrapped<T> {
+ static class Wrapped<T> {
private final EqualityComparer<T> comparer;
private final T element;
@@ -4683,4 +4683,37 @@ public abstract class EnumerableDefaults {
}
};
}
+
+ /**
+ * Merge Union Enumerable.
+ * Performs a union (or union all) of all its inputs (which must be already sorted),
+ * respecting the order.
+ *
+ * @param sources input enumerables (must be already sorted)
+ * @param sortKeySelector sort key selector
+ * @param sortComparator sort comparator to decide the next item
+ * @param all whether duplicates will be considered or not
+ * @param equalityComparer {@link EqualityComparer} to control duplicates,
+ * only used if {@code all} is {@code false}
+ * @param <TSource> record type
+ * @param <TKey> sort key
+ */
+ public static <TSource, TKey> Enumerable<TSource> mergeUnion(
+ List<Enumerable<TSource>> sources,
+ Function1<TSource, TKey> sortKeySelector,
+ Comparator<TKey> sortComparator,
+ boolean all,
+ EqualityComparer<TSource> equalityComparer) {
+ return new AbstractEnumerable<TSource>() {
+ @Override public Enumerator<TSource> enumerator() {
+ return new MergeUnionEnumerator<>(
+ sources,
+ sortKeySelector,
+ sortComparator,
+ all,
+ equalityComparer);
+ }
+ };
+ }
+
}
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/MergeUnionEnumerator.java b/linq4j/src/main/java/org/apache/calcite/linq4j/MergeUnionEnumerator.java
new file mode 100644
index 0000000..841c34f
--- /dev/null
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/MergeUnionEnumerator.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j;
+
+import org.apache.calcite.linq4j.function.EqualityComparer;
+import org.apache.calcite.linq4j.function.Function1;
+
+import org.checkerframework.checker.initialization.qual.UnknownInitialization;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Performs a union (or union all) of all its inputs (which must be already sorted),
+ * respecting the order.
+ * @param <TSource> record type
+ * @param <TKey> sort key
+ */
+final class MergeUnionEnumerator<TSource, TKey> implements Enumerator<TSource> {
+ private final Enumerator<TSource>[] inputs;
+ private final TSource[] currentInputsValues;
+ private final boolean[] inputsFinished;
+ private final Function1<TSource, TKey> sortKeySelector;
+ private final Comparator<TKey> sortComparator;
+ private TSource currentValue;
+ private int activeInputs;
+
+ // Set to control duplicates, only used if "all" is false
+ private final @Nullable Set<EnumerableDefaults.Wrapped<TSource>> processed;
+ private final @Nullable Function1<TSource, EnumerableDefaults.Wrapped<TSource>> wrapper;
+ private @Nullable TKey currentKeyInProcessedSet;
+
+ private static final Object NOT_INIT = new Object();
+
+ MergeUnionEnumerator(
+ List<Enumerable<TSource>> sources,
+ Function1<TSource, TKey> sortKeySelector,
+ Comparator<TKey> sortComparator,
+ boolean all,
+ EqualityComparer<TSource> equalityComparer) {
+ this.sortKeySelector = sortKeySelector;
+ this.sortComparator = sortComparator;
+
+ if (all) {
+ this.processed = null;
+ this.wrapper = null;
+ } else {
+ this.processed = new HashSet<>();
+ this.wrapper = EnumerableDefaults.wrapperFor(equalityComparer);
+ }
+
+ final int size = sources.size();
+ //noinspection unchecked
+ this.inputs = new Enumerator[size];
+ int i = 0;
+ for (Enumerable<TSource> source : sources) {
+ this.inputs[i++] = source.enumerator();
+ }
+
+ //noinspection unchecked
+ this.currentInputsValues = (TSource[]) new Object[size];
+ this.activeInputs = this.currentInputsValues.length;
+ this.inputsFinished = new boolean[size];
+ //noinspection unchecked
+ this.currentValue = (TSource) NOT_INIT;
+
+ initEnumerators();
+ }
+
+ @RequiresNonNull("inputs")
+ @SuppressWarnings("method.invocation.invalid")
+ private void initEnumerators(@UnknownInitialization MergeUnionEnumerator<TSource, TKey> this) {
+ for (int i = 0; i < inputs.length; i++) {
+ moveEnumerator(i);
+ }
+ }
+
+ private void moveEnumerator(int i) {
+ final Enumerator<TSource> enumerator = inputs[i];
+ if (!enumerator.moveNext()) {
+ activeInputs--;
+ inputsFinished[i] = true;
+ @Nullable TSource[] auxInputsValues = currentInputsValues;
+ auxInputsValues[i] = null;
+ } else {
+ currentInputsValues[i] = enumerator.current();
+ inputsFinished[i] = false;
+ }
+ }
+
+ private boolean checkNotDuplicated(TSource value) {
+ if (processed == null) {
+ return true; // UNION ALL: no need to check duplicates
+ }
+
+ // check duplicates
+ @SuppressWarnings("dereference.of.nullable")
+ final EnumerableDefaults.Wrapped<TSource> wrapped = wrapper.apply(value);
+ if (!processed.contains(wrapped)) {
+ final TKey key = sortKeySelector.apply(value);
+ if (!processed.isEmpty()) {
+ // Since inputs are sorted, we do not need to keep in the set all the items that we
+ // have previously returned, just the ones with the same key, as soon as we see a new
+ // key, we can clear the set containing the items belonging to the previous key
+ @SuppressWarnings("argument.type.incompatible")
+ final int sortComparison = sortComparator.compare(key, currentKeyInProcessedSet);
+ if (sortComparison != 0) {
+ processed.clear();
+ currentKeyInProcessedSet = key;
+ }
+ } else {
+ currentKeyInProcessedSet = key;
+ }
+ processed.add(wrapped);
+ return true;
+ }
+ return false;
+ }
+
+ private int compare(TSource e1, TSource e2) {
+ final TKey key1 = sortKeySelector.apply(e1);
+ final TKey key2 = sortKeySelector.apply(e2);
+ return sortComparator.compare(key1, key2);
+ }
+
+ @Override public TSource current() {
+ if (currentValue == NOT_INIT) {
+ throw new NoSuchElementException();
+ }
+ return currentValue;
+ }
+
+ @Override public boolean moveNext() {
+ while (activeInputs > 0) {
+ int candidateIndex = -1;
+ for (int i = 0; i < currentInputsValues.length; i++) {
+ if (!inputsFinished[i]) {
+ candidateIndex = i;
+ break;
+ }
+ }
+
+ if (activeInputs > 1) {
+ for (int i = candidateIndex + 1; i < currentInputsValues.length; i++) {
+ if (inputsFinished[i]) {
+ continue;
+ }
+
+ final int comp = compare(
+ currentInputsValues[candidateIndex],
+ currentInputsValues[i]);
+ if (comp > 0) {
+ candidateIndex = i;
+ }
+ }
+ }
+
+ if (checkNotDuplicated(currentInputsValues[candidateIndex])) {
+ currentValue = currentInputsValues[candidateIndex];
+ moveEnumerator(candidateIndex);
+ return true;
+ } else {
+ moveEnumerator(candidateIndex);
+ // continue loop
+ }
+ }
+ return false;
+ }
+
+ @Override public void reset() {
+ for (Enumerator<TSource> enumerator : inputs) {
+ enumerator.reset();
+ }
+ if (processed != null) {
+ processed.clear();
+ currentKeyInProcessedSet = null;
+ }
+ //noinspection unchecked
+ currentValue = (TSource) NOT_INIT;
+ activeInputs = currentInputsValues.length;
+ initEnumerators();
+ }
+
+ @Override public void close() {
+ for (Enumerator<TSource> enumerator : inputs) {
+ enumerator.close();
+ }
+ }
+}