You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by li...@apache.org on 2022/03/04 13:24:38 UTC
[calcite] 34/41: [CALCITE-3673] ListTransientTable should not leave tables in the schema [CALCITE-4054] RepeatUnion containing a Correlate with a transientScan on its RHS causes NPE
This is an automated email from the ASF dual-hosted git repository.
liyafan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
commit 296fc3e7cded904c90fa90bbb87e5b13c51567e0
Author: rubenada <ru...@gmail.com>
AuthorDate: Thu Jan 13 14:12:29 2022 +0000
[CALCITE-3673] ListTransientTable should not leave tables in the schema
[CALCITE-4054] RepeatUnion containing a Correlate with a transientScan on its RHS causes NPE
---
.../adapter/enumerable/EnumerableRepeatUnion.java | 42 +++++++++-
.../enumerable/EnumerableRepeatUnionRule.java | 3 +-
.../org/apache/calcite/jdbc/CalciteSchema.java | 4 +
.../apache/calcite/prepare/RelOptTableImpl.java | 4 +
.../org/apache/calcite/rel/core/RelFactories.java | 6 +-
.../org/apache/calcite/rel/core/RepeatUnion.java | 23 +++++-
.../calcite/rel/logical/LogicalRepeatUnion.java | 19 +++--
.../java/org/apache/calcite/schema/SchemaPlus.java | 7 ++
.../calcite/schema/impl/ListTransientTable.java | 11 ++-
.../java/org/apache/calcite/tools/RelBuilder.java | 2 +-
.../org/apache/calcite/util/BuiltInMethod.java | 5 +-
.../test/enumerable/EnumerableRepeatUnionTest.java | 91 ++++++++++++++++++++++
.../apache/calcite/linq4j/EnumerableDefaults.java | 7 +-
13 files changed, 198 insertions(+), 26 deletions(-)
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRepeatUnion.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRepeatUnion.java
index 9fc2764..c6e3fda 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRepeatUnion.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRepeatUnion.java
@@ -17,17 +17,23 @@
package org.apache.calcite.adapter.enumerable;
import org.apache.calcite.linq4j.function.Experimental;
+import org.apache.calcite.linq4j.function.Function0;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RepeatUnion;
+import org.apache.calcite.schema.TransientTable;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Util;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
import java.util.List;
+import java.util.Objects;
/**
* Implementation of {@link RepeatUnion} in
@@ -43,14 +49,15 @@ public class EnumerableRepeatUnion extends RepeatUnion implements EnumerableRel
* Creates an EnumerableRepeatUnion.
*/
EnumerableRepeatUnion(RelOptCluster cluster, RelTraitSet traitSet,
- RelNode seed, RelNode iterative, boolean all, int iterationLimit) {
- super(cluster, traitSet, seed, iterative, all, iterationLimit);
+ RelNode seed, RelNode iterative, boolean all, int iterationLimit,
+ @Nullable RelOptTable transientTable) {
+ super(cluster, traitSet, seed, iterative, all, iterationLimit, transientTable);
}
@Override public EnumerableRepeatUnion copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.size() == 2;
return new EnumerableRepeatUnion(getCluster(), traitSet,
- inputs.get(0), inputs.get(1), all, iterationLimit);
+ inputs.get(0), inputs.get(1), all, iterationLimit, transientTable);
}
@Override public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
@@ -61,6 +68,32 @@ public class EnumerableRepeatUnion extends RepeatUnion implements EnumerableRel
RelNode seed = getSeedRel();
RelNode iteration = getIterativeRel();
+ Expression cleanUpFunctionExp = Expressions.constant(null);
+ if (transientTable != null) {
+ // root.getRootSchema().add(tableName, table);
+ Expression tableExp = implementor.stash(
+ Objects.requireNonNull(transientTable.unwrap(TransientTable.class)),
+ TransientTable.class);
+ String tableName =
+ transientTable.getQualifiedName().get(transientTable.getQualifiedName().size() - 1);
+ Expression tableNameExp = Expressions.constant(tableName, String.class);
+ builder.append(
+ Expressions.call(
+ Expressions.call(
+ implementor.getRootExpression(),
+ BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method),
+ BuiltInMethod.SCHEMA_PLUS_ADD_TABLE.method,
+ tableNameExp,
+ tableExp));
+ // root.getRootSchema().removeTable(tableName);
+ cleanUpFunctionExp = Expressions.lambda(Function0.class,
+ Expressions.call(
+ Expressions.call(
+ implementor.getRootExpression(),
+ BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method),
+ BuiltInMethod.SCHEMA_PLUS_REMOVE_TABLE.method, tableNameExp));
+ }
+
Result seedResult = implementor.visitChild(this, 0, (EnumerableRel) seed, pref);
Result iterationResult = implementor.visitChild(this, 1, (EnumerableRel) iteration, pref);
@@ -78,7 +111,8 @@ public class EnumerableRepeatUnion extends RepeatUnion implements EnumerableRel
iterativeExp,
Expressions.constant(iterationLimit, int.class),
Expressions.constant(all, boolean.class),
- Util.first(physType.comparer(), Expressions.call(BuiltInMethod.IDENTITY_COMPARER.method)));
+ Util.first(physType.comparer(), Expressions.call(BuiltInMethod.IDENTITY_COMPARER.method)),
+ cleanUpFunctionExp);
builder.add(unionExp);
return implementor.result(physType, builder.toBlock());
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRepeatUnionRule.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRepeatUnionRule.java
index cb3cdf3..308b2aa 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRepeatUnionRule.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRepeatUnionRule.java
@@ -54,6 +54,7 @@ public class EnumerableRepeatUnionRule extends ConverterRule {
convert(seedRel, seedRel.getTraitSet().replace(out)),
convert(iterativeRel, iterativeRel.getTraitSet().replace(out)),
union.all,
- union.iterationLimit);
+ union.iterationLimit,
+ union.getTransientTable());
}
}
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
index e0afc8e..05eee7f 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
@@ -730,6 +730,10 @@ public abstract class CalciteSchema {
CalciteSchema.this.add(name, table);
}
+ @Override public boolean removeTable(String name) {
+ return CalciteSchema.this.removeTable(name);
+ }
+
@Override public void add(String name, Function function) {
CalciteSchema.this.add(name, function);
}
diff --git a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
index fac9293..0878292 100644
--- a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
@@ -473,6 +473,10 @@ public class RelOptTableImpl extends Prepare.AbstractPreparingTable {
throw new UnsupportedOperationException();
}
+ @Override public boolean removeTable(String name) {
+ throw new UnsupportedOperationException();
+ }
+
@Override public void add(String name,
org.apache.calcite.schema.Function function) {
throw new UnsupportedOperationException();
diff --git a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
index 0998c99..6d5e2c0 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
@@ -610,7 +610,7 @@ public class RelFactories {
public interface RepeatUnionFactory {
/** Creates a {@link RepeatUnion}. */
RelNode createRepeatUnion(RelNode seed, RelNode iterative, boolean all,
- int iterationLimit);
+ int iterationLimit, RelOptTable table);
}
/**
@@ -619,8 +619,8 @@ public class RelFactories {
*/
private static class RepeatUnionFactoryImpl implements RepeatUnionFactory {
@Override public RelNode createRepeatUnion(RelNode seed, RelNode iterative,
- boolean all, int iterationLimit) {
- return LogicalRepeatUnion.create(seed, iterative, all, iterationLimit);
+ boolean all, int iterationLimit, RelOptTable table) {
+ return LogicalRepeatUnion.create(seed, iterative, all, iterationLimit, table);
}
}
diff --git a/core/src/main/java/org/apache/calcite/rel/core/RepeatUnion.java b/core/src/main/java/org/apache/calcite/rel/core/RepeatUnion.java
index f1419d7..073861e 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/RepeatUnion.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/RepeatUnion.java
@@ -18,15 +18,20 @@ package org.apache.calcite.rel.core;
import org.apache.calcite.linq4j.function.Experimental;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.TransientTable;
import org.apache.calcite.util.Util;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
import java.util.List;
+import java.util.Objects;
/**
* Relational expression that computes a repeat union (recursive union in SQL
@@ -40,7 +45,7 @@ import java.util.List;
*
* <li>Evaluate the right input (i.e., iterative relational expression) over and
* over until it produces no more results (or until an optional maximum number
- * of iterations is reached). For UNION (but not UNION ALL), discard
+ * of iterations is reached). For UNION (but not UNION ALL), discard
* duplicated results.
* </ul>
*
@@ -61,12 +66,22 @@ public abstract class RepeatUnion extends BiRel {
*/
public final int iterationLimit;
+ /**
+ * Transient table where repeat union's intermediate results will be stored (optional).
+ */
+ protected final @Nullable RelOptTable transientTable;
+
//~ Constructors -----------------------------------------------------------
protected RepeatUnion(RelOptCluster cluster, RelTraitSet traitSet,
- RelNode seed, RelNode iterative, boolean all, int iterationLimit) {
+ RelNode seed, RelNode iterative, boolean all, int iterationLimit,
+ @Nullable RelOptTable transientTable) {
super(cluster, traitSet, seed, iterative);
this.iterationLimit = iterationLimit;
this.all = all;
+ this.transientTable = transientTable;
+ if (transientTable != null) {
+ Objects.requireNonNull(transientTable.unwrap(TransientTable.class));
+ }
}
@Override public double estimateRowCount(RelMetadataQuery mq) {
@@ -95,6 +110,10 @@ public abstract class RepeatUnion extends BiRel {
return right;
}
+ public @Nullable RelOptTable getTransientTable() {
+ return transientTable;
+ }
+
@Override protected RelDataType deriveRowType() {
final List<RelDataType> inputRowTypes =
Util.transform(getInputs(), RelNode::getRowType);
diff --git a/core/src/main/java/org/apache/calcite/rel/logical/LogicalRepeatUnion.java b/core/src/main/java/org/apache/calcite/rel/logical/LogicalRepeatUnion.java
index 5084176..f3bface 100644
--- a/core/src/main/java/org/apache/calcite/rel/logical/LogicalRepeatUnion.java
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalRepeatUnion.java
@@ -19,10 +19,13 @@ package org.apache.calcite.rel.logical;
import org.apache.calcite.linq4j.function.Experimental;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RepeatUnion;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
import java.util.List;
/**
@@ -37,22 +40,24 @@ public class LogicalRepeatUnion extends RepeatUnion {
//~ Constructors -----------------------------------------------------------
private LogicalRepeatUnion(RelOptCluster cluster, RelTraitSet traitSet,
- RelNode seed, RelNode iterative, boolean all, int iterationLimit) {
- super(cluster, traitSet, seed, iterative, all, iterationLimit);
+ RelNode seed, RelNode iterative, boolean all, int iterationLimit,
+ @Nullable RelOptTable transientTable) {
+ super(cluster, traitSet, seed, iterative, all, iterationLimit, transientTable);
}
/** Creates a LogicalRepeatUnion. */
public static LogicalRepeatUnion create(RelNode seed, RelNode iterative,
- boolean all) {
- return create(seed, iterative, all, -1);
+ boolean all, @Nullable RelOptTable transientTable) {
+ return create(seed, iterative, all, -1, transientTable);
}
/** Creates a LogicalRepeatUnion. */
public static LogicalRepeatUnion create(RelNode seed, RelNode iterative,
- boolean all, int iterationLimit) {
+ boolean all, int iterationLimit, @Nullable RelOptTable transientTable) {
RelOptCluster cluster = seed.getCluster();
RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
- return new LogicalRepeatUnion(cluster, traitSet, seed, iterative, all, iterationLimit);
+ return new LogicalRepeatUnion(cluster, traitSet, seed, iterative, all, iterationLimit,
+ transientTable);
}
//~ Methods ----------------------------------------------------------------
@@ -62,6 +67,6 @@ public class LogicalRepeatUnion extends RepeatUnion {
assert traitSet.containsIfApplicable(Convention.NONE);
assert inputs.size() == 2;
return new LogicalRepeatUnion(getCluster(), traitSet,
- inputs.get(0), inputs.get(1), all, iterationLimit);
+ inputs.get(0), inputs.get(1), all, iterationLimit, transientTable);
}
}
diff --git a/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java b/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java
index 7052603..5f41788 100644
--- a/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java
+++ b/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java
@@ -68,6 +68,13 @@ public interface SchemaPlus extends Schema {
/** Adds a table to this schema. */
void add(String name, Table table);
+ /** Removes a table from this schema, used e.g. to clean-up temporary tables. */
+ default boolean removeTable(String name) {
+ // Default implementation provided for backwards compatibility, to be removed before 2.0
+ return false;
+ }
+
+
/** Adds a function to this schema. */
void add(String name, Function function);
diff --git a/core/src/main/java/org/apache/calcite/schema/impl/ListTransientTable.java b/core/src/main/java/org/apache/calcite/schema/impl/ListTransientTable.java
index 0ee6ec4..653a66f 100644
--- a/core/src/main/java/org/apache/calcite/schema/impl/ListTransientTable.java
+++ b/core/src/main/java/org/apache/calcite/schema/impl/ListTransientTable.java
@@ -49,8 +49,6 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import static java.util.Objects.requireNonNull;
-
/**
* {@link TransientTable} backed by a Java list. It will be automatically added to the
* current schema when {@link #scan(DataContext)} method gets called.
@@ -61,7 +59,9 @@ import static java.util.Objects.requireNonNull;
public class ListTransientTable extends AbstractQueryableTable
implements TransientTable, ModifiableTable, ScannableTable {
private static final Type TYPE = Object[].class;
+ @SuppressWarnings("rawtypes")
private final List rows = new ArrayList();
+ @SuppressWarnings({"unused", "FieldCanBeLocal"})
private final String name;
private final RelDataType protoRowType;
@@ -84,20 +84,19 @@ public class ListTransientTable extends AbstractQueryableTable
updateColumnList, sourceExpressionList, flattened);
}
+ @SuppressWarnings("rawtypes")
@Override public Collection getModifiableCollection() {
return rows;
}
@Override public Enumerable<@Nullable Object[]> scan(DataContext root) {
- // add the table into the schema, so that it is accessible by any potential operator
- requireNonNull(root.getRootSchema(), "root.getRootSchema()")
- .add(name, this);
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<@Nullable Object[]>() {
@Override public Enumerator<@Nullable Object[]> enumerator() {
return new Enumerator<@Nullable Object[]>() {
+ @SuppressWarnings({"rawtypes", "unchecked"})
private final List list = new ArrayList(rows);
private int i = -1;
@@ -129,7 +128,7 @@ public class ListTransientTable extends AbstractQueryableTable
}
@Override public Expression getExpression(SchemaPlus schema, String tableName,
- Class clazz) {
+ @SuppressWarnings("rawtypes") Class clazz) {
return Schemas.tableExpression(schema, elementType, tableName, clazz);
}
diff --git a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
index 9c178c0..1bcacf3 100644
--- a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++ b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -2709,7 +2709,7 @@ public class RelBuilder {
RelNode seed = tableSpool(Spool.Type.LAZY, Spool.Type.LAZY, finder.relOptTable).build();
RelNode repeatUnion =
struct.repeatUnionFactory.createRepeatUnion(seed, iterative, all,
- iterationLimit);
+ iterationLimit, finder.relOptTable);
return push(repeatUnion);
}
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 510baf1..e1089a8 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -106,6 +106,7 @@ import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlJsonConstructorNullClause;
import org.apache.calcite.sql.SqlJsonQueryEmptyOrErrorBehavior;
@@ -156,6 +157,8 @@ public enum BuiltInMethod {
REMOVE_ALL(ExtendedEnumerable.class, "removeAll", Collection.class),
SCHEMA_GET_SUB_SCHEMA(Schema.class, "getSubSchema", String.class),
SCHEMA_GET_TABLE(Schema.class, "getTable", String.class),
+ SCHEMA_PLUS_ADD_TABLE(SchemaPlus.class, "add", String.class, Table.class),
+ SCHEMA_PLUS_REMOVE_TABLE(SchemaPlus.class, "removeTable", String.class),
SCHEMA_PLUS_UNWRAP(SchemaPlus.class, "unwrap", Class.class),
SCHEMAS_ENUMERABLE_SCANNABLE(Schemas.class, "enumerable",
ScannableTable.class, DataContext.class),
@@ -240,7 +243,7 @@ public enum BuiltInMethod {
UNION(ExtendedEnumerable.class, "union", Enumerable.class),
CONCAT(ExtendedEnumerable.class, "concat", Enumerable.class),
REPEAT_UNION(EnumerableDefaults.class, "repeatUnion", Enumerable.class,
- Enumerable.class, int.class, boolean.class, EqualityComparer.class),
+ Enumerable.class, int.class, boolean.class, EqualityComparer.class, Function0.class),
MERGE_UNION(EnumerableDefaults.class, "mergeUnion", List.class, Function1.class,
Comparator.class, boolean.class, EqualityComparer.class),
LAZY_COLLECTION_SPOOL(EnumerableDefaults.class, "lazyCollectionSpool", Collection.class,
diff --git a/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableRepeatUnionTest.java b/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableRepeatUnionTest.java
index 733ba65..f4a3878 100644
--- a/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableRepeatUnionTest.java
+++ b/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableRepeatUnionTest.java
@@ -17,12 +17,24 @@
package org.apache.calcite.test.enumerable;
import org.apache.calcite.adapter.enumerable.EnumerableRepeatUnion;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.rules.JoinCommuteRule;
+import org.apache.calcite.rel.rules.JoinToCorrelateRule;
+import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.schemata.hr.HierarchySchema;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.function.Consumer;
/**
* Unit tests for {@link EnumerableRepeatUnion}.
@@ -227,4 +239,83 @@ class EnumerableRepeatUnionTest {
.returnsOrdered("i=1", "i=2", "i=null", "i=3", "i=2", "i=3", "i=3");
}
+ /** Test case for
+ * <a href="https://issues.apache.org/jira/browse/CALCITE-4054">[CALCITE-4054]
+ * RepeatUnion containing a Correlate with a transientScan on its RHS causes NPE</a>. */
+ @Test void testRepeatUnionWithCorrelateWithTransientScanOnItsRight() {
+ CalciteAssert.that()
+ .with(CalciteConnectionProperty.LEX, Lex.JAVA)
+ .with(CalciteConnectionProperty.FORCE_DECORRELATE, false)
+ .withSchema("s", new ReflectiveSchema(new HierarchySchema()))
+ .withHook(Hook.PLANNER, (Consumer<RelOptPlanner>) planner -> {
+ planner.addRule(JoinToCorrelateRule.Config.DEFAULT.toRule());
+ planner.removeRule(JoinCommuteRule.Config.DEFAULT.toRule());
+ planner.removeRule(EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE);
+ planner.removeRule(EnumerableRules.ENUMERABLE_JOIN_RULE);
+ })
+ .withRel(builder -> {
+ builder
+ // WITH RECURSIVE delta(empid, name) as (
+ // SELECT empid, name FROM emps WHERE empid = 2
+ // UNION ALL
+ // SELECT e.empid, e.name FROM delta d
+ // JOIN hierarchies h ON d.empid = h.managerid
+ // JOIN emps e ON h.subordinateid = e.empid
+ // )
+ // SELECT empid, name FROM delta
+ .scan("s", "emps")
+ .filter(
+ builder.equals(
+ builder.field("empid"),
+ builder.literal(2)))
+ .project(
+ builder.field("emps", "empid"),
+ builder.field("emps", "name"))
+
+ .transientScan("#DELTA#");
+ RelNode transientScan = builder.build(); // pop the transientScan to use it later
+
+ builder
+ .scan("s", "hierarchies")
+ .push(transientScan) // use the transientScan as right input of the join
+ .join(
+ JoinRelType.INNER,
+ builder.equals(
+ builder.field(2, "#DELTA#", "empid"),
+ builder.field(2, "hierarchies", "managerid")))
+
+ .scan("s", "emps")
+ .join(
+ JoinRelType.INNER,
+ builder.equals(
+ builder.field(2, "hierarchies", "subordinateid"),
+ builder.field(2, "emps", "empid")))
+ .project(
+ builder.field("emps", "empid"),
+ builder.field("emps", "name"))
+ .repeatUnion("#DELTA#", true);
+ return builder.build();
+ })
+ .explainHookMatches(""
+ + "EnumerableRepeatUnion(all=[true])\n"
+ + " EnumerableTableSpool(readType=[LAZY], writeType=[LAZY], table=[[#DELTA#]])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=[2], expr#6=[=($t0, $t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n"
+ + " EnumerableTableSpool(readType=[LAZY], writeType=[LAZY], table=[[#DELTA#]])\n"
+ + " EnumerableCalc(expr#0..8=[{inputs}], empid=[$t4], name=[$t6])\n"
+ + " EnumerableCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1}])\n"
+ // It is important to have EnumerableCorrelate + #DELTA# table scan on its right
+ // to reproduce the issue CALCITE-4054
+ + " EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n"
+ + " EnumerableTableScan(table=[[s, hierarchies]])\n"
+ + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[$cor0], expr#3=[$t2.managerid], expr#4=[=($t0, $t3)], proj#0..1=[{exprs}], $condition=[$t4])\n"
+ + " EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[#DELTA#]])\n"
+ + " EnumerableCalc(expr#0..4=[{inputs}], expr#5=[$cor1], expr#6=[$t5.subordinateid], expr#7=[=($t6, $t0)], proj#0..4=[{exprs}], $condition=[$t7])\n"
+ + " EnumerableTableScan(table=[[s, emps]])\n")
+ .returnsUnordered(""
+ + "empid=2; name=Emp2\n"
+ + "empid=3; name=Emp3\n"
+ + "empid=5; name=Emp5");
+ }
}
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
index 4f8271d..7febd11 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
@@ -4519,6 +4519,7 @@ public abstract class EnumerableDefaults {
* @param all whether duplicates will be considered or not
* @param comparer {@link EqualityComparer} to control duplicates,
* only used if {@code all} is {@code false}
+ * @param cleanUpFunction optional clean-up actions (e.g. delete temporary table)
* @param <TSource> record type
*/
@SuppressWarnings("unchecked")
@@ -4527,7 +4528,8 @@ public abstract class EnumerableDefaults {
Enumerable<TSource> iteration,
int iterationLimit,
boolean all,
- EqualityComparer<TSource> comparer) {
+ EqualityComparer<TSource> comparer,
+ @Nullable Function0<Boolean> cleanUpFunction) {
return new AbstractEnumerable<TSource>() {
@Override public Enumerator<TSource> enumerator() {
return new Enumerator<TSource>() {
@@ -4623,6 +4625,9 @@ public abstract class EnumerableDefaults {
}
@Override public void close() {
+ if (cleanUpFunction != null) {
+ cleanUpFunction.apply();
+ }
seedEnumerator.close();
if (iterativeEnumerator != null) {
iterativeEnumerator.close();