You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2018/01/10 19:26:43 UTC
[1/2] calcite git commit: [CALCITE-2127] In Interpreter,
allow a node to have more than one consumer
Repository: calcite
Updated Branches:
refs/heads/master 2918b8fe5 -> c0f912ddf
[CALCITE-2127] In Interpreter, allow a node to have more than one consumer
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/7b85d445
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/7b85d445
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/7b85d445
Branch: refs/heads/master
Commit: 7b85d445462799926474945c6ab33b1cb8ce33cc
Parents: 2918b8f
Author: Julian Hyde <jh...@apache.org>
Authored: Mon Jan 8 23:21:54 2018 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Jan 9 23:01:18 2018 -0800
----------------------------------------------------------------------
.../enumerable/EnumerableInterpretable.java | 10 +-
.../calcite/interpreter/AbstractSingleNode.java | 6 +-
.../calcite/interpreter/AggregateNode.java | 6 +-
.../apache/calcite/interpreter/Bindables.java | 16 +-
.../apache/calcite/interpreter/Compiler.java | 70 +++++
.../apache/calcite/interpreter/FilterNode.java | 8 +-
.../calcite/interpreter/InterpretableRel.java | 6 +-
.../apache/calcite/interpreter/Interpreter.java | 289 ++++++++++++-------
.../apache/calcite/interpreter/JoinNode.java | 14 +-
.../org/apache/calcite/interpreter/Nodes.java | 32 +-
.../apache/calcite/interpreter/ProjectNode.java | 8 +-
.../apache/calcite/interpreter/SortNode.java | 4 +-
.../calcite/interpreter/TableScanNode.java | 64 ++--
.../apache/calcite/interpreter/UnionNode.java | 6 +-
.../apache/calcite/interpreter/ValuesNode.java | 12 +-
.../apache/calcite/interpreter/WindowNode.java | 4 +-
.../calcite/adapter/druid/DruidQuery.java | 2 +-
17 files changed, 346 insertions(+), 211 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java
index ef5af1e..17447bc 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java
@@ -18,9 +18,9 @@ package org.apache.calcite.adapter.enumerable;
import org.apache.calcite.DataContext;
import org.apache.calcite.avatica.Helper;
+import org.apache.calcite.interpreter.Compiler;
import org.apache.calcite.interpreter.InterpretableConvention;
import org.apache.calcite.interpreter.InterpretableRel;
-import org.apache.calcite.interpreter.Interpreter;
import org.apache.calcite.interpreter.Node;
import org.apache.calcite.interpreter.Row;
import org.apache.calcite.interpreter.Sink;
@@ -79,7 +79,7 @@ public class EnumerableInterpretable extends ConverterImpl
final ArrayBindable arrayBindable = box(bindable);
final Enumerable<Object[]> enumerable =
arrayBindable.bind(implementor.dataContext);
- return new EnumerableNode(enumerable, implementor.interpreter, this);
+ return new EnumerableNode(enumerable, implementor.compiler, this);
}
public static Bindable toBindable(Map<String, Object> parameters,
@@ -186,10 +186,10 @@ public class EnumerableInterpretable extends ConverterImpl
private final Enumerable<Object[]> enumerable;
private final Sink sink;
- EnumerableNode(Enumerable<Object[]> enumerable,
- Interpreter interpreter, EnumerableInterpretable rel) {
+ EnumerableNode(Enumerable<Object[]> enumerable, Compiler compiler,
+ EnumerableInterpretable rel) {
this.enumerable = enumerable;
- this.sink = interpreter.sink(rel);
+ this.sink = compiler.sink(rel);
}
public void run() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
index d6174ba..0734305 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
@@ -28,10 +28,10 @@ abstract class AbstractSingleNode<T extends SingleRel> implements Node {
protected final Sink sink;
protected final T rel;
- AbstractSingleNode(Interpreter interpreter, T rel) {
+ AbstractSingleNode(Compiler compiler, T rel) {
this.rel = rel;
- this.source = interpreter.source(rel, 0);
- this.sink = interpreter.sink(rel);
+ this.source = compiler.source(rel, 0);
+ this.sink = compiler.sink(rel);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
index 47b933b..e2d1adc 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
@@ -63,9 +63,9 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
private final ImmutableList<AccumulatorFactory> accumulatorFactories;
private final DataContext dataContext;
- public AggregateNode(Interpreter interpreter, Aggregate rel) {
- super(interpreter, rel);
- this.dataContext = interpreter.getDataContext();
+ public AggregateNode(Compiler compiler, Aggregate rel) {
+ super(compiler, rel);
+ this.dataContext = compiler.getDataContext();
ImmutableBitSet union = ImmutableBitSet.of();
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
index c5287eb..a26325e 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
@@ -321,7 +321,7 @@ public class Bindables {
}
public Node implement(InterpreterImplementor implementor) {
- return new FilterNode(implementor.interpreter, this);
+ return new FilterNode(implementor.compiler, this);
}
}
@@ -378,7 +378,7 @@ public class Bindables {
}
public Node implement(InterpreterImplementor implementor) {
- return new ProjectNode(implementor.interpreter, this);
+ return new ProjectNode(implementor.compiler, this);
}
}
@@ -434,7 +434,7 @@ public class Bindables {
}
public Node implement(InterpreterImplementor implementor) {
- return new SortNode(implementor.interpreter, this);
+ return new SortNode(implementor.compiler, this);
}
}
@@ -504,7 +504,7 @@ public class Bindables {
}
public Node implement(InterpreterImplementor implementor) {
- return new JoinNode(implementor.interpreter, this);
+ return new JoinNode(implementor.compiler, this);
}
}
@@ -556,7 +556,7 @@ public class Bindables {
}
public Node implement(InterpreterImplementor implementor) {
- return new UnionNode(implementor.interpreter, this);
+ return new UnionNode(implementor.compiler, this);
}
}
@@ -582,7 +582,7 @@ public class Bindables {
}
public Node implement(InterpreterImplementor implementor) {
- return new ValuesNode(implementor.interpreter, this);
+ return new ValuesNode(implementor.compiler, this);
}
}
@@ -660,7 +660,7 @@ public class Bindables {
}
public Node implement(InterpreterImplementor implementor) {
- return new AggregateNode(implementor.interpreter, this);
+ return new AggregateNode(implementor.compiler, this);
}
}
@@ -722,7 +722,7 @@ public class Bindables {
}
public Node implement(InterpreterImplementor implementor) {
- return new WindowNode(implementor.interpreter, this);
+ return new WindowNode(implementor.compiler, this);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Compiler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Compiler.java b/core/src/main/java/org/apache/calcite/interpreter/Compiler.java
new file mode 100644
index 0000000..cdd5c5e
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/Compiler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * Context while converting a tree of {@link RelNode} to a program
+ * that can be run by an {@link Interpreter}.
+ */
+public interface Compiler {
+
+ /** Compiles an expression to an executable form. */
+ Scalar compile(List<RexNode> nodes, RelDataType inputRowType);
+
+ RelDataType combinedRowType(List<RelNode> inputs);
+
+ Source source(RelNode rel, int ordinal);
+
+ /**
+ * Creates a Sink for a relational expression to write into.
+ *
+ * <p>This method is generally called from the constructor of a {@link Node}.
+ * But a constructor could instead call
+ * {@link #enumerable(RelNode, Enumerable)}.
+ *
+ * @param rel Relational expression
+ * @return Sink
+ */
+ Sink sink(RelNode rel);
+
+ /** Tells the interpreter that a given relational expression wishes to
+ * give its output as an enumerable.
+ *
+ * <p>This is as opposed to the norm, where a relational expression calls
+ * {@link #sink(RelNode)}, then its {@link Node#run()} method writes into that
+ * sink.
+ *
+ * @param rel Relational expression
+ * @param rowEnumerable Contents of relational expression
+ */
+ void enumerable(RelNode rel, Enumerable<Row> rowEnumerable);
+
+ DataContext getDataContext();
+
+ Context createContext();
+
+}
+
+// End Compiler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
index 7d4ab3d..4f3fb7f 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
@@ -28,12 +28,12 @@ public class FilterNode extends AbstractSingleNode<Filter> {
private final Scalar condition;
private final Context context;
- public FilterNode(Interpreter interpreter, Filter rel) {
- super(interpreter, rel);
+ public FilterNode(Compiler compiler, Filter rel) {
+ super(compiler, rel);
this.condition =
- interpreter.compile(ImmutableList.of(rel.getCondition()),
+ compiler.compile(ImmutableList.of(rel.getCondition()),
rel.getRowType());
- this.context = interpreter.createContext();
+ this.context = compiler.createContext();
}
public void run() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
index 340299e..42276ac 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
@@ -35,17 +35,17 @@ public interface InterpretableRel extends RelNode {
/** Context when a {@link RelNode} is being converted to an interpreter
* {@link Node}. */
class InterpreterImplementor {
- public final Interpreter interpreter;
+ public final Compiler compiler;
public final Map<String, Object> internalParameters =
Maps.newLinkedHashMap();
public final CalcitePrepare.SparkHandler spark;
public final DataContext dataContext;
public final Map<RelNode, List<Sink>> relSinks = Maps.newHashMap();
- public InterpreterImplementor(Interpreter interpreter,
+ public InterpreterImplementor(Compiler compiler,
CalcitePrepare.SparkHandler spark,
DataContext dataContext) {
- this.interpreter = interpreter;
+ this.compiler = compiler;
this.spark = spark;
this.dataContext = dataContext;
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index 9360294..6096630 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -21,7 +21,9 @@ import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.linq4j.TransformedEnumerator;
+import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
@@ -38,16 +40,27 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.Pair;
import org.apache.calcite.util.ReflectUtil;
import org.apache.calcite.util.ReflectiveVisitDispatcher;
import org.apache.calcite.util.ReflectiveVisitor;
+import org.apache.calcite.util.Util;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import java.math.BigDecimal;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -62,20 +75,19 @@ import java.util.NoSuchElementException;
*/
public class Interpreter extends AbstractEnumerable<Object[]>
implements AutoCloseable {
- final Map<RelNode, NodeInfo> nodes = Maps.newLinkedHashMap();
+ private final Map<RelNode, NodeInfo> nodes;
private final DataContext dataContext;
private final RelNode rootRel;
- private final Map<RelNode, List<RelNode>> relInputs = Maps.newHashMap();
- protected final ScalarCompiler scalarCompiler;
/** Creates an Interpreter. */
public Interpreter(DataContext dataContext, RelNode rootRel) {
this.dataContext = Preconditions.checkNotNull(dataContext);
- this.scalarCompiler =
- new JaninoRexCompiler(rootRel.getCluster().getRexBuilder());
final RelNode rel = optimize(rootRel);
- final Compiler compiler = new Nodes.CoreCompiler(this);
- this.rootRel = compiler.visitRoot(rel);
+ final CompilerImpl compiler =
+ new Nodes.CoreCompiler(this, rootRel.getCluster());
+ Pair<RelNode, Map<RelNode, NodeInfo>> pair = compiler.visitRoot(rel);
+ this.rootRel = pair.left;
+ this.nodes = ImmutableMap.copyOf(pair.right);
}
private RelNode optimize(RelNode rootRel) {
@@ -98,7 +110,8 @@ public class Interpreter extends AbstractEnumerable<Object[]>
if (nodeInfo.rowEnumerable != null) {
rows = nodeInfo.rowEnumerable.enumerator();
} else {
- final ArrayDeque<Row> queue = ((ListSink) nodeInfo.sink).list;
+ final ArrayDeque<Row> queue =
+ Iterables.getOnlyElement(nodeInfo.sinks.values()).list;
rows = Linq4j.iterableEnumerator(queue);
}
@@ -124,23 +137,6 @@ public class Interpreter extends AbstractEnumerable<Object[]>
public void close() {
}
- /** Compiles an expression to an executable form. */
- public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) {
- if (inputRowType == null) {
- inputRowType = dataContext.getTypeFactory().builder().build();
- }
- return scalarCompiler.compile(nodes, inputRowType);
- }
-
- RelDataType combinedRowType(List<RelNode> inputs) {
- final RelDataTypeFactory.Builder builder =
- dataContext.getTypeFactory().builder();
- for (RelNode input : inputs) {
- builder.addAll(input.getRowType().getFieldList());
- }
- return builder.build();
- }
-
/** Not used. */
private class FooCompiler implements ScalarCompiler {
public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) {
@@ -249,85 +245,16 @@ public class Interpreter extends AbstractEnumerable<Object[]>
}
}
- public Source source(RelNode rel, int ordinal) {
- final RelNode input = getInput(rel, ordinal);
- final NodeInfo nodeInfo = nodes.get(input);
- if (nodeInfo == null) {
- throw new AssertionError("should be registered: " + rel);
- }
- if (nodeInfo.rowEnumerable != null) {
- return new EnumeratorSource(nodeInfo.rowEnumerable.enumerator());
- }
- Sink sink = nodeInfo.sink;
- if (sink instanceof ListSink) {
- return new ListSource((ListSink) nodeInfo.sink);
- }
- throw new IllegalStateException(
- "Got a sink " + sink + " to which there is no match source type!");
- }
-
- private RelNode getInput(RelNode rel, int ordinal) {
- final List<RelNode> inputs = relInputs.get(rel);
- if (inputs != null) {
- return inputs.get(ordinal);
- }
- return rel.getInput(ordinal);
- }
-
- /**
- * Creates a Sink for a relational expression to write into.
- *
- * <p>This method is generally called from the constructor of a {@link Node}.
- * But a constructor could instead call
- * {@link #enumerable(RelNode, Enumerable)}.
- *
- * @param rel Relational expression
- * @return Sink
- */
- public Sink sink(RelNode rel) {
- final ArrayDeque<Row> queue = new ArrayDeque<>(1);
- final Sink sink = new ListSink(queue);
- NodeInfo nodeInfo = new NodeInfo(rel, sink, null);
- nodes.put(rel, nodeInfo);
- return sink;
- }
-
- /** Tells the interpreter that a given relational expression wishes to
- * give its output as an enumerable.
- *
- * <p>This is as opposed to the norm, where a relational expression calls
- * {@link #sink(RelNode)}, then its {@link Node#run()} method writes into that
- * sink.
- *
- * @param rel Relational expression
- * @param rowEnumerable Contents of relational expression
- */
- public void enumerable(RelNode rel, Enumerable<Row> rowEnumerable) {
- NodeInfo nodeInfo = new NodeInfo(rel, null, rowEnumerable);
- nodes.put(rel, nodeInfo);
- }
-
- public Context createContext() {
- return new Context(dataContext);
- }
-
- public DataContext getDataContext() {
- return dataContext;
- }
-
/** Information about a node registered in the data flow graph. */
private static class NodeInfo {
final RelNode rel;
- final Sink sink;
+ final Map<Edge, ListSink> sinks = new LinkedHashMap<>();
final Enumerable<Row> rowEnumerable;
Node node;
- NodeInfo(RelNode rel, Sink sink, Enumerable<Row> rowEnumerable) {
+ NodeInfo(RelNode rel, Enumerable<Row> rowEnumerable) {
this.rel = rel;
- this.sink = sink;
this.rowEnumerable = rowEnumerable;
- Preconditions.checkArgument((sink == null) != (rowEnumerable == null),
- "one or the other");
}
}
@@ -386,15 +313,20 @@ public class Interpreter extends AbstractEnumerable<Object[]>
/** Implementation of {@link Source} using a {@link java.util.ArrayDeque}. */
private static class ListSource implements Source {
private final ArrayDeque<Row> list;
+ private Iterator<Row> iterator = null;
- ListSource(ListSink sink) {
- this.list = sink.list;
+ ListSource(ArrayDeque<Row> list) {
+ this.list = list;
}
public Row receive() {
try {
- return list.remove();
+ if (iterator == null) {
+ iterator = list.iterator();
+ }
+ return iterator.next();
} catch (NoSuchElementException e) {
+ iterator = null;
return null;
}
}
@@ -404,6 +336,35 @@ public class Interpreter extends AbstractEnumerable<Object[]>
}
}
+ /** Implementation of {@link Sink} using a {@link java.util.ArrayDeque}. */
+ private static class DuplicatingSink implements Sink {
+ private List<ArrayDeque<Row>> queues;
+
+ private DuplicatingSink(List<ArrayDeque<Row>> queues) {
+ this.queues = ImmutableList.copyOf(queues);
+ }
+
+ public void send(Row row) throws InterruptedException {
+ for (ArrayDeque<Row> queue : queues) {
+ queue.add(row);
+ }
+ }
+
+ public void end() throws InterruptedException {
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override public void setSourceEnumerable(Enumerable<Row> enumerable)
+ throws InterruptedException {
+ // just copy over the source into the local list
+ final Enumerator<Row> enumerator = enumerable.enumerator();
+ while (enumerator.moveNext()) {
+ this.send(enumerator.current());
+ }
+ enumerator.close();
+ }
+ }
+
/**
* Walks over a tree of {@link org.apache.calcite.rel.RelNode} and, for each,
* creates a {@link org.apache.calcite.interpreter.Node} that can be
@@ -417,25 +378,32 @@ public class Interpreter extends AbstractEnumerable<Object[]>
* "visit" methods in this or a sub-class, and they will be found and called
* via reflection.
*/
- public static class Compiler extends RelVisitor implements ReflectiveVisitor {
- private final ReflectiveVisitDispatcher<Compiler, RelNode> dispatcher =
- ReflectUtil.createDispatcher(Compiler.class, RelNode.class);
+ static class CompilerImpl extends RelVisitor
+ implements Compiler, ReflectiveVisitor {
+ final ScalarCompiler scalarCompiler;
+ private final ReflectiveVisitDispatcher<CompilerImpl, RelNode> dispatcher =
+ ReflectUtil.createDispatcher(CompilerImpl.class, RelNode.class);
protected final Interpreter interpreter;
protected RelNode rootRel;
protected RelNode rel;
protected Node node;
+ final Map<RelNode, NodeInfo> nodes = new LinkedHashMap<>();
+ final Map<RelNode, List<RelNode>> relInputs = new HashMap<>();
+ final Multimap<RelNode, Edge> outEdges = LinkedHashMultimap.create();
private static final String REWRITE_METHOD_NAME = "rewrite";
private static final String VISIT_METHOD_NAME = "visit";
- Compiler(Interpreter interpreter) {
+ CompilerImpl(Interpreter interpreter, RelOptCluster cluster) {
this.interpreter = interpreter;
+ this.scalarCompiler = new JaninoRexCompiler(cluster.getRexBuilder());
}
- public RelNode visitRoot(RelNode p) {
+ /** Visits the tree, starting from the root {@code p}. */
+ Pair<RelNode, Map<RelNode, NodeInfo>> visitRoot(RelNode p) {
rootRel = p;
visit(p, 0, null);
- return rootRel;
+ return Pair.of(rootRel, nodes);
}
@Override public void visit(RelNode p, int ordinal, RelNode parent) {
@@ -454,10 +422,10 @@ public class Interpreter extends AbstractEnumerable<Object[]>
}
p = rel;
if (parent != null) {
- List<RelNode> inputs = interpreter.relInputs.get(parent);
+ List<RelNode> inputs = relInputs.get(parent);
if (inputs == null) {
inputs = Lists.newArrayList(parent.getInputs());
- interpreter.relInputs.put(parent, inputs);
+ relInputs.put(parent, inputs);
}
inputs.set(ordinal, p);
} else {
@@ -466,7 +434,10 @@ public class Interpreter extends AbstractEnumerable<Object[]>
}
// rewrite children first (from left to right)
- final List<RelNode> inputs = interpreter.relInputs.get(p);
+ final List<RelNode> inputs = relInputs.get(p);
+ for (Ord<RelNode> input : Ord.zip(Util.first(inputs, p.getInputs()))) {
+ outEdges.put(input.e, new Edge(p, input.i));
+ }
if (inputs != null) {
for (int i = 0; i < inputs.size(); i++) {
RelNode input = inputs.get(i);
@@ -482,17 +453,22 @@ public class Interpreter extends AbstractEnumerable<Object[]>
if (p instanceof InterpretableRel) {
InterpretableRel interpretableRel = (InterpretableRel) p;
node = interpretableRel.implement(
- new InterpretableRel.InterpreterImplementor(interpreter, null,
- null));
+ new InterpretableRel.InterpreterImplementor(this, null, null));
} else {
// Probably need to add a visit(XxxRel) method to CoreCompiler.
throw new AssertionError("interpreter: no implementation for "
+ p.getClass());
}
}
- final NodeInfo nodeInfo = interpreter.nodes.get(p);
+ final NodeInfo nodeInfo = nodes.get(p);
assert nodeInfo != null;
nodeInfo.node = node;
+ if (inputs != null) {
+ for (int i = 0; i < inputs.size(); i++) {
+ final RelNode input = inputs.get(i);
+ visit(input, i, p);
+ }
+ }
}
/** Fallback rewrite method.
@@ -502,6 +478,95 @@ public class Interpreter extends AbstractEnumerable<Object[]>
* rewrite. */
public void rewrite(RelNode r) {
}
+
+ public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) {
+ if (inputRowType == null) {
+ inputRowType = interpreter.dataContext.getTypeFactory().builder()
+ .build();
+ }
+ return scalarCompiler.compile(nodes, inputRowType);
+ }
+
+ public RelDataType combinedRowType(List<RelNode> inputs) {
+ final RelDataTypeFactory.Builder builder =
+ interpreter.dataContext.getTypeFactory().builder();
+ for (RelNode input : inputs) {
+ builder.addAll(input.getRowType().getFieldList());
+ }
+ return builder.build();
+ }
+
+ public Source source(RelNode rel, int ordinal) {
+ final RelNode input = getInput(rel, ordinal);
+ final Edge edge = new Edge(rel, ordinal);
+ final Collection<Edge> edges = outEdges.get(input);
+ final NodeInfo nodeInfo = nodes.get(input);
+ if (nodeInfo == null) {
+ throw new AssertionError("should be registered: " + rel);
+ }
+ if (nodeInfo.rowEnumerable != null) {
+ return new EnumeratorSource(nodeInfo.rowEnumerable.enumerator());
+ }
+ assert nodeInfo.sinks.size() == edges.size();
+ final ListSink sink = nodeInfo.sinks.get(edge);
+ if (sink != null) {
+ return new ListSource(sink.list);
+ }
+ throw new IllegalStateException(
+ "Got a sink " + sink + " to which there is no match source type!");
+ }
+
+ private RelNode getInput(RelNode rel, int ordinal) {
+ final List<RelNode> inputs = relInputs.get(rel);
+ if (inputs != null) {
+ return inputs.get(ordinal);
+ }
+ return rel.getInput(ordinal);
+ }
+
+ public Sink sink(RelNode rel) {
+ final Collection<Edge> edges = outEdges.get(rel);
+ final Collection<Edge> edges2 = edges.isEmpty()
+ ? ImmutableList.of(new Edge(null, 0))
+ : edges;
+ NodeInfo nodeInfo = nodes.get(rel);
+ if (nodeInfo == null) {
+ nodeInfo = new NodeInfo(rel, null);
+ nodes.put(rel, nodeInfo);
+ for (Edge edge : edges2) {
+ nodeInfo.sinks.put(edge, new ListSink(new ArrayDeque<Row>()));
+ }
+ }
+ if (edges.size() == 1) {
+ return Iterables.getOnlyElement(nodeInfo.sinks.values());
+ } else {
+ final List<ArrayDeque<Row>> queues = new ArrayList<>();
+ for (ListSink sink : nodeInfo.sinks.values()) {
+ queues.add(sink.list);
+ }
+ return new DuplicatingSink(queues);
+ }
+ }
+
+ public void enumerable(RelNode rel, Enumerable<Row> rowEnumerable) {
+ NodeInfo nodeInfo = new NodeInfo(rel, rowEnumerable);
+ nodes.put(rel, nodeInfo);
+ }
+
+ public Context createContext() {
+ return new Context(getDataContext());
+ }
+
+ public DataContext getDataContext() {
+ return interpreter.dataContext;
+ }
+ }
+
+ /** Edge between a {@link RelNode} and one of its inputs. */
+ static class Edge extends Pair<RelNode, Integer> {
+ Edge(RelNode parent, int ordinal) {
+ super(parent, ordinal);
+ }
}
/** Converts a list of expressions to a scalar that can compute their
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
index 6e2ae15..349e26f 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
@@ -35,14 +35,14 @@ public class JoinNode implements Node {
private final Scalar condition;
private final Context context;
- public JoinNode(Interpreter interpreter, Join rel) {
- this.leftSource = interpreter.source(rel, 0);
- this.rightSource = interpreter.source(rel, 1);
- this.sink = interpreter.sink(rel);
- this.condition = interpreter.compile(ImmutableList.of(rel.getCondition()),
- interpreter.combinedRowType(rel.getInputs()));
+ public JoinNode(Compiler compiler, Join rel) {
+ this.leftSource = compiler.source(rel, 0);
+ this.rightSource = compiler.source(rel, 1);
+ this.sink = compiler.sink(rel);
+ this.condition = compiler.compile(ImmutableList.of(rel.getCondition()),
+ compiler.combinedRowType(rel.getInputs()));
this.rel = rel;
- this.context = interpreter.createContext();
+ this.context = compiler.createContext();
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
index 19f397a..b6f1b08 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
@@ -16,6 +16,7 @@
*/
package org.apache.calcite.interpreter;
+import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
@@ -35,54 +36,53 @@ import com.google.common.collect.ImmutableList;
*/
public class Nodes {
/** Extension to
- * {@link org.apache.calcite.interpreter.Interpreter.Compiler}
+ * {@link Interpreter.CompilerImpl}
* that knows how to handle the core logical
* {@link org.apache.calcite.rel.RelNode}s. */
- public static class CoreCompiler extends Interpreter.Compiler {
- CoreCompiler(Interpreter interpreter) {
- super(interpreter);
+ public static class CoreCompiler extends Interpreter.CompilerImpl {
+ CoreCompiler(Interpreter interpreter, RelOptCluster cluster) {
+ super(interpreter, cluster);
}
public void visit(Aggregate agg) {
- node = new AggregateNode(interpreter, agg);
+ node = new AggregateNode(this, agg);
}
public void visit(Filter filter) {
- node = new FilterNode(interpreter, filter);
+ node = new FilterNode(this, filter);
}
public void visit(Project project) {
- node = new ProjectNode(interpreter, project);
+ node = new ProjectNode(this, project);
}
public void visit(Values value) {
- node = new ValuesNode(interpreter, value);
+ node = new ValuesNode(this, value);
}
public void visit(TableScan scan) {
- node = TableScanNode.create(interpreter, scan,
- ImmutableList.<RexNode>of(), null);
+ final ImmutableList<RexNode> filters = ImmutableList.of();
+ node = TableScanNode.create(this, scan, filters, null);
}
public void visit(Bindables.BindableTableScan scan) {
- node = TableScanNode.create(interpreter, scan, scan.filters,
- scan.projects);
+ node = TableScanNode.create(this, scan, scan.filters, scan.projects);
}
public void visit(Sort sort) {
- node = new SortNode(interpreter, sort);
+ node = new SortNode(this, sort);
}
public void visit(Union union) {
- node = new UnionNode(interpreter, union);
+ node = new UnionNode(this, union);
}
public void visit(Join join) {
- node = new JoinNode(interpreter, join);
+ node = new JoinNode(this, join);
}
public void visit(Window window) {
- node = new WindowNode(interpreter, window);
+ node = new WindowNode(this, window);
}
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
index 4c280b7..2503b1cd 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
@@ -27,12 +27,12 @@ public class ProjectNode extends AbstractSingleNode<Project> {
private final Context context;
private final int projectCount;
- public ProjectNode(Interpreter interpreter, Project rel) {
- super(interpreter, rel);
+ public ProjectNode(Compiler compiler, Project rel) {
+ super(compiler, rel);
this.projectCount = rel.getProjects().size();
- this.scalar = interpreter.compile(rel.getProjects(),
+ this.scalar = compiler.compile(rel.getProjects(),
rel.getInput().getRowType());
- this.context = interpreter.createContext();
+ this.context = compiler.createContext();
}
public void run() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
index 5eaaaf6..dff97a6 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
@@ -34,8 +34,8 @@ import java.util.List;
* {@link org.apache.calcite.rel.core.Sort}.
*/
public class SortNode extends AbstractSingleNode<Sort> {
- public SortNode(Interpreter interpreter, Sort rel) {
- super(interpreter, rel);
+ public SortNode(Compiler compiler, Sort rel) {
+ super(compiler, rel);
}
public void run() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
index 9cec9e6..9c8124e 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
@@ -59,12 +59,12 @@ import static org.apache.calcite.util.Static.RESOURCE;
* {@link org.apache.calcite.rel.core.TableScan}.
*/
public class TableScanNode implements Node {
- private TableScanNode(Interpreter interpreter, TableScan rel,
+ private TableScanNode(Compiler compiler, TableScan rel,
Enumerable<Row> enumerable) {
- interpreter.enumerable(rel, enumerable);
+ compiler.enumerable(rel, enumerable);
}
- public void run() throws InterruptedException {
+ public void run() {
// nothing to do
}
@@ -73,56 +73,56 @@ public class TableScanNode implements Node {
* <p>Tries various table SPIs, and negotiates with the table which filters
* and projects it can implement. Adds to the Enumerable implementations of
* any filters and projects that cannot be implemented by the table. */
- static TableScanNode create(Interpreter interpreter, TableScan rel,
+ static TableScanNode create(Compiler compiler, TableScan rel,
ImmutableList<RexNode> filters, ImmutableIntList projects) {
final RelOptTable relOptTable = rel.getTable();
final ProjectableFilterableTable pfTable =
relOptTable.unwrap(ProjectableFilterableTable.class);
if (pfTable != null) {
- return createProjectableFilterable(interpreter, rel, filters, projects,
+ return createProjectableFilterable(compiler, rel, filters, projects,
pfTable);
}
final FilterableTable filterableTable =
relOptTable.unwrap(FilterableTable.class);
if (filterableTable != null) {
- return createFilterable(interpreter, rel, filters, projects,
+ return createFilterable(compiler, rel, filters, projects,
filterableTable);
}
final ScannableTable scannableTable =
relOptTable.unwrap(ScannableTable.class);
if (scannableTable != null) {
- return createScannable(interpreter, rel, filters, projects,
+ return createScannable(compiler, rel, filters, projects,
scannableTable);
}
//noinspection unchecked
final Enumerable<Row> enumerable = relOptTable.unwrap(Enumerable.class);
if (enumerable != null) {
- return createEnumerable(interpreter, rel, enumerable, null, filters,
+ return createEnumerable(compiler, rel, enumerable, null, filters,
projects);
}
final QueryableTable queryableTable =
relOptTable.unwrap(QueryableTable.class);
if (queryableTable != null) {
- return createQueryable(interpreter, rel, filters, projects,
+ return createQueryable(compiler, rel, filters, projects,
queryableTable);
}
throw new AssertionError("cannot convert table " + relOptTable
+ " to enumerable");
}
- private static TableScanNode createScannable(Interpreter interpreter,
- TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects,
+ private static TableScanNode createScannable(Compiler compiler, TableScan rel,
+ ImmutableList<RexNode> filters, ImmutableIntList projects,
ScannableTable scannableTable) {
final Enumerable<Row> rowEnumerable =
- Enumerables.toRow(scannableTable.scan(interpreter.getDataContext()));
- return createEnumerable(interpreter, rel, rowEnumerable, null, filters,
+ Enumerables.toRow(scannableTable.scan(compiler.getDataContext()));
+ return createEnumerable(compiler, rel, rowEnumerable, null, filters,
projects);
}
- private static TableScanNode createQueryable(Interpreter interpreter,
+ private static TableScanNode createQueryable(Compiler compiler,
TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects,
QueryableTable queryableTable) {
- final DataContext root = interpreter.getDataContext();
+ final DataContext root = compiler.getDataContext();
final RelOptTable relOptTable = rel.getTable();
final Type elementType = queryableTable.getElementType();
SchemaPlus schema = root.getRootSchema();
@@ -163,14 +163,14 @@ public class TableScanNode implements Node {
rowEnumerable =
Schemas.queryable(root, Row.class, relOptTable.getQualifiedName());
}
- return createEnumerable(interpreter, rel, rowEnumerable, null, filters,
+ return createEnumerable(compiler, rel, rowEnumerable, null, filters,
projects);
}
- private static TableScanNode createFilterable(Interpreter interpreter,
+ private static TableScanNode createFilterable(Compiler compiler,
TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects,
FilterableTable filterableTable) {
- final DataContext root = interpreter.getDataContext();
+ final DataContext root = compiler.getDataContext();
final List<RexNode> mutableFilters = Lists.newArrayList(filters);
final Enumerable<Object[]> enumerable =
filterableTable.scan(root, mutableFilters);
@@ -180,14 +180,14 @@ public class TableScanNode implements Node {
}
}
final Enumerable<Row> rowEnumerable = Enumerables.toRow(enumerable);
- return createEnumerable(interpreter, rel, rowEnumerable, null,
+ return createEnumerable(compiler, rel, rowEnumerable, null,
mutableFilters, projects);
}
- private static TableScanNode createProjectableFilterable(
- Interpreter interpreter, TableScan rel, ImmutableList<RexNode> filters,
- ImmutableIntList projects, ProjectableFilterableTable pfTable) {
- final DataContext root = interpreter.getDataContext();
+ private static TableScanNode createProjectableFilterable(Compiler compiler,
+ TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects,
+ ProjectableFilterableTable pfTable) {
+ final DataContext root = compiler.getDataContext();
final ImmutableIntList originalProjects = projects;
for (;;) {
final List<RexNode> mutableFilters = Lists.newArrayList(filters);
@@ -234,19 +234,20 @@ public class TableScanNode implements Node {
// project the leading columns.
rejectedProjects = ImmutableIntList.identity(originalProjects.size());
}
- return createEnumerable(interpreter, rel, rowEnumerable, projects,
+ return createEnumerable(compiler, rel, rowEnumerable, projects,
mutableFilters, rejectedProjects);
}
}
- private static TableScanNode createEnumerable(
- Interpreter interpreter, TableScan rel,
- Enumerable<Row> enumerable, final ImmutableIntList acceptedProjects,
- List<RexNode> rejectedFilters, final ImmutableIntList rejectedProjects) {
+ private static TableScanNode createEnumerable(Compiler compiler,
+ TableScan rel, Enumerable<Row> enumerable,
+ final ImmutableIntList acceptedProjects, List<RexNode> rejectedFilters,
+ final ImmutableIntList rejectedProjects) {
if (!rejectedFilters.isEmpty()) {
final RexNode filter =
RexUtil.composeConjunction(rel.getCluster().getRexBuilder(),
rejectedFilters, false);
+ assert filter != null;
// Re-map filter for the projects that have been applied already
final RexNode filter2;
final RelDataType inputRowType;
@@ -267,9 +268,8 @@ public class TableScanNode implements Node {
inputRowType = builder.build();
}
final Scalar condition =
- interpreter.compile(
- ImmutableList.of(filter2), inputRowType);
- final Context context = interpreter.createContext();
+ compiler.compile(ImmutableList.of(filter2), inputRowType);
+ final Context context = compiler.createContext();
enumerable = enumerable.where(
new Predicate1<Row>() {
@Override public boolean apply(Row row) {
@@ -292,7 +292,7 @@ public class TableScanNode implements Node {
}
});
}
- return new TableScanNode(interpreter, rel, enumerable);
+ return new TableScanNode(compiler, rel, enumerable);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
index 701dbc7..0f6fe4f 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
@@ -32,13 +32,13 @@ public class UnionNode implements Node {
private final Sink sink;
private final Union rel;
- public UnionNode(Interpreter interpreter, Union rel) {
+ public UnionNode(Compiler compiler, Union rel) {
ImmutableList.Builder<Source> builder = ImmutableList.builder();
for (int i = 0; i < rel.getInputs().size(); i++) {
- builder.add(interpreter.source(rel, i));
+ builder.add(compiler.source(rel, i));
}
this.sources = builder.build();
- this.sink = interpreter.sink(rel);
+ this.sink = compiler.sink(rel);
this.rel = rel;
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
index 008d6f0..1c6f255 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
@@ -34,21 +34,21 @@ public class ValuesNode implements Node {
private final int fieldCount;
private final ImmutableList<Row> rows;
- public ValuesNode(Interpreter interpreter, Values rel) {
- this.sink = interpreter.sink(rel);
+ public ValuesNode(Compiler compiler, Values rel) {
+ this.sink = compiler.sink(rel);
this.fieldCount = rel.getRowType().getFieldCount();
- this.rows = createRows(interpreter, rel.getTuples());
+ this.rows = createRows(compiler, rel.getTuples());
}
- private ImmutableList<Row> createRows(Interpreter interpreter,
+ private ImmutableList<Row> createRows(Compiler compiler,
ImmutableList<ImmutableList<RexLiteral>> tuples) {
final List<RexNode> nodes = Lists.newArrayList();
for (ImmutableList<RexLiteral> tuple : tuples) {
nodes.addAll(tuple);
}
- final Scalar scalar = interpreter.compile(nodes, null);
+ final Scalar scalar = compiler.compile(nodes, null);
final Object[] values = new Object[nodes.size()];
- final Context context = interpreter.createContext();
+ final Context context = compiler.createContext();
scalar.execute(context, values);
final ImmutableList.Builder<Row> rows = ImmutableList.builder();
Object[] subValues = new Object[fieldCount];
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
index a03608c..eff4bea 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
@@ -23,8 +23,8 @@ import org.apache.calcite.rel.core.Window;
* {@link org.apache.calcite.rel.core.Window}.
*/
public class WindowNode extends AbstractSingleNode<Window> {
- WindowNode(Interpreter interpreter, Window rel) {
- super(interpreter, rel);
+ WindowNode(Compiler compiler, Window rel) {
+ super(compiler, rel);
}
public void run() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 2469841..8d3ea64 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -426,7 +426,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
}
@Override public Node implement(InterpreterImplementor implementor) {
- return new DruidQueryNode(implementor.interpreter, this);
+ return new DruidQueryNode(implementor.compiler, this);
}
public QuerySpec getQuerySpec() {
[2/2] calcite git commit: [CALCITE-2116] In HepPlanner,
ensure that common sub-expressions have the same digest (LeoWangLZ)
Posted by jh...@apache.org.
[CALCITE-2116] In HepPlanner, ensure that common sub-expressions have the same digest (LeoWangLZ)
Close apache/calcite#597
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/c0f912dd
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/c0f912dd
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/c0f912dd
Branch: refs/heads/master
Commit: c0f912ddff717eaedb9a2d652e95f232d10e207f
Parents: 7b85d44
Author: LeoWangLZ <wa...@163.com>
Authored: Fri Jan 5 10:15:24 2018 +0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Jan 9 23:01:29 2018 -0800
----------------------------------------------------------------------
.../org/apache/calcite/plan/hep/HepPlanner.java | 5 +++-
.../org/apache/calcite/test/HepPlannerTest.java | 25 ++++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/c0f912dd/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java b/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java
index 657460a..d43bbd8 100644
--- a/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java
@@ -811,6 +811,9 @@ public class HepPlanner extends AbstractRelOptPlanner {
rel = rel.copy(rel.getTraitSet(), newInputs);
onCopy(oldRel, rel);
}
+ // Compute digest first time we add to DAG,
+ // otherwise can't get equivVertex for common sub-expression
+ rel.recomputeDigest();
// try to find equivalent rel only if DAG is allowed
if (!noDAG) {
@@ -886,7 +889,7 @@ public class HepPlanner extends AbstractRelOptPlanner {
if (mapDigestToVertex.get(oldDigest) == vertex) {
mapDigestToVertex.remove(oldDigest);
}
- String newDigest = rel.recomputeDigest();
+ String newDigest = rel.getDigest();
// When a transformation happened in one rule apply, support
// vertex2 replace vertex1, but the current relNode of
// vertex1 and vertex2 is same,
http://git-wip-us.apache.org/repos/asf/calcite/blob/c0f912dd/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java b/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java
index e3e144b..7a08d0c 100644
--- a/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.logical.LogicalIntersect;
import org.apache.calcite.rel.logical.LogicalUnion;
@@ -248,6 +249,30 @@ public class HepPlannerTest extends RelOptTestBase {
+ " (select * from dept) d2");
}
+ /** Tests that if two relational expressions are equivalent, the planner
+ * notices, and only applies the rule once. */
+ @Test public void testCommonSubExpression() {
+ // In the following,
+ // (select 1 from dept where abs(-1)=20)
+ // occurs twice, but it's a common sub-expression, so the rule should only
+ // apply once.
+ HepProgramBuilder programBuilder = HepProgram.builder();
+ programBuilder.addRuleInstance(FilterToCalcRule.INSTANCE);
+
+ final HepTestListener listener = new HepTestListener(0);
+ HepPlanner planner = new HepPlanner(programBuilder.build());
+ planner.addListener(listener);
+
+ final String sql = "(select 1 from dept where abs(-1)=20)\n"
+ + "union all\n"
+ + "(select 1 from dept where abs(-1)=20)";
+ planner.setRoot(tester.convertSqlToRel(sql).rel);
+ RelNode bestRel = planner.findBestExp();
+
+ assertThat(bestRel.getInput(0).equals(bestRel.getInput(1)), is(true));
+ assertThat(listener.getApplyTimes() == 1, is(true));
+ }
+
@Test public void testSubprogram() throws Exception {
// Verify that subprogram gets re-executed until fixpoint.
// In this case, the first time through we limit it to generate