You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2018/01/10 19:26:43 UTC

[1/2] calcite git commit: [CALCITE-2127] In Interpreter, allow a node to have more than one consumer

Repository: calcite
Updated Branches:
  refs/heads/master 2918b8fe5 -> c0f912ddf


[CALCITE-2127] In Interpreter, allow a node to have more than one consumer


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/7b85d445
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/7b85d445
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/7b85d445

Branch: refs/heads/master
Commit: 7b85d445462799926474945c6ab33b1cb8ce33cc
Parents: 2918b8f
Author: Julian Hyde <jh...@apache.org>
Authored: Mon Jan 8 23:21:54 2018 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Jan 9 23:01:18 2018 -0800

----------------------------------------------------------------------
 .../enumerable/EnumerableInterpretable.java     |  10 +-
 .../calcite/interpreter/AbstractSingleNode.java |   6 +-
 .../calcite/interpreter/AggregateNode.java      |   6 +-
 .../apache/calcite/interpreter/Bindables.java   |  16 +-
 .../apache/calcite/interpreter/Compiler.java    |  70 +++++
 .../apache/calcite/interpreter/FilterNode.java  |   8 +-
 .../calcite/interpreter/InterpretableRel.java   |   6 +-
 .../apache/calcite/interpreter/Interpreter.java | 289 ++++++++++++-------
 .../apache/calcite/interpreter/JoinNode.java    |  14 +-
 .../org/apache/calcite/interpreter/Nodes.java   |  32 +-
 .../apache/calcite/interpreter/ProjectNode.java |   8 +-
 .../apache/calcite/interpreter/SortNode.java    |   4 +-
 .../calcite/interpreter/TableScanNode.java      |  64 ++--
 .../apache/calcite/interpreter/UnionNode.java   |   6 +-
 .../apache/calcite/interpreter/ValuesNode.java  |  12 +-
 .../apache/calcite/interpreter/WindowNode.java  |   4 +-
 .../calcite/adapter/druid/DruidQuery.java       |   2 +-
 17 files changed, 346 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java
index ef5af1e..17447bc 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java
@@ -18,9 +18,9 @@ package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.DataContext;
 import org.apache.calcite.avatica.Helper;
+import org.apache.calcite.interpreter.Compiler;
 import org.apache.calcite.interpreter.InterpretableConvention;
 import org.apache.calcite.interpreter.InterpretableRel;
-import org.apache.calcite.interpreter.Interpreter;
 import org.apache.calcite.interpreter.Node;
 import org.apache.calcite.interpreter.Row;
 import org.apache.calcite.interpreter.Sink;
@@ -79,7 +79,7 @@ public class EnumerableInterpretable extends ConverterImpl
     final ArrayBindable arrayBindable = box(bindable);
     final Enumerable<Object[]> enumerable =
         arrayBindable.bind(implementor.dataContext);
-    return new EnumerableNode(enumerable, implementor.interpreter, this);
+    return new EnumerableNode(enumerable, implementor.compiler, this);
   }
 
   public static Bindable toBindable(Map<String, Object> parameters,
@@ -186,10 +186,10 @@ public class EnumerableInterpretable extends ConverterImpl
     private final Enumerable<Object[]> enumerable;
     private final Sink sink;
 
-    EnumerableNode(Enumerable<Object[]> enumerable,
-        Interpreter interpreter, EnumerableInterpretable rel) {
+    EnumerableNode(Enumerable<Object[]> enumerable, Compiler compiler,
+        EnumerableInterpretable rel) {
       this.enumerable = enumerable;
-      this.sink = interpreter.sink(rel);
+      this.sink = compiler.sink(rel);
     }
 
     public void run() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
index d6174ba..0734305 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
@@ -28,10 +28,10 @@ abstract class AbstractSingleNode<T extends SingleRel> implements Node {
   protected final Sink sink;
   protected final T rel;
 
-  AbstractSingleNode(Interpreter interpreter, T rel) {
+  AbstractSingleNode(Compiler compiler, T rel) {
     this.rel = rel;
-    this.source = interpreter.source(rel, 0);
-    this.sink = interpreter.sink(rel);
+    this.source = compiler.source(rel, 0);
+    this.sink = compiler.sink(rel);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
index 47b933b..e2d1adc 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
@@ -63,9 +63,9 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
   private final ImmutableList<AccumulatorFactory> accumulatorFactories;
   private final DataContext dataContext;
 
-  public AggregateNode(Interpreter interpreter, Aggregate rel) {
-    super(interpreter, rel);
-    this.dataContext = interpreter.getDataContext();
+  public AggregateNode(Compiler compiler, Aggregate rel) {
+    super(compiler, rel);
+    this.dataContext = compiler.getDataContext();
 
     ImmutableBitSet union = ImmutableBitSet.of();
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
index c5287eb..a26325e 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
@@ -321,7 +321,7 @@ public class Bindables {
     }
 
     public Node implement(InterpreterImplementor implementor) {
-      return new FilterNode(implementor.interpreter, this);
+      return new FilterNode(implementor.compiler, this);
     }
   }
 
@@ -378,7 +378,7 @@ public class Bindables {
     }
 
     public Node implement(InterpreterImplementor implementor) {
-      return new ProjectNode(implementor.interpreter, this);
+      return new ProjectNode(implementor.compiler, this);
     }
   }
 
@@ -434,7 +434,7 @@ public class Bindables {
     }
 
     public Node implement(InterpreterImplementor implementor) {
-      return new SortNode(implementor.interpreter, this);
+      return new SortNode(implementor.compiler, this);
     }
   }
 
@@ -504,7 +504,7 @@ public class Bindables {
     }
 
     public Node implement(InterpreterImplementor implementor) {
-      return new JoinNode(implementor.interpreter, this);
+      return new JoinNode(implementor.compiler, this);
     }
   }
 
@@ -556,7 +556,7 @@ public class Bindables {
     }
 
     public Node implement(InterpreterImplementor implementor) {
-      return new UnionNode(implementor.interpreter, this);
+      return new UnionNode(implementor.compiler, this);
     }
   }
 
@@ -582,7 +582,7 @@ public class Bindables {
     }
 
     public Node implement(InterpreterImplementor implementor) {
-      return new ValuesNode(implementor.interpreter, this);
+      return new ValuesNode(implementor.compiler, this);
     }
   }
 
@@ -660,7 +660,7 @@ public class Bindables {
     }
 
     public Node implement(InterpreterImplementor implementor) {
-      return new AggregateNode(implementor.interpreter, this);
+      return new AggregateNode(implementor.compiler, this);
     }
   }
 
@@ -722,7 +722,7 @@ public class Bindables {
     }
 
     public Node implement(InterpreterImplementor implementor) {
-      return new WindowNode(implementor.interpreter, this);
+      return new WindowNode(implementor.compiler, this);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Compiler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Compiler.java b/core/src/main/java/org/apache/calcite/interpreter/Compiler.java
new file mode 100644
index 0000000..cdd5c5e
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/Compiler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * Context while converting a tree of {@link RelNode} to a program
+ * that can be run by an {@link Interpreter}.
+ */
+public interface Compiler {
+
+  /** Compiles an expression to an executable form. */
+  Scalar compile(List<RexNode> nodes, RelDataType inputRowType);
+
+  RelDataType combinedRowType(List<RelNode> inputs);
+
+  Source source(RelNode rel, int ordinal);
+
+  /**
+   * Creates a Sink for a relational expression to write into.
+   *
+   * <p>This method is generally called from the constructor of a {@link Node}.
+   * But a constructor could instead call
+   * {@link #enumerable(RelNode, Enumerable)}.
+   *
+   * @param rel Relational expression
+   * @return Sink
+   */
+  Sink sink(RelNode rel);
+
+  /** Tells the interpreter that a given relational expression wishes to
+   * give its output as an enumerable.
+   *
+   * <p>This is as opposed to the norm, where a relational expression calls
+   * {@link #sink(RelNode)}, then its {@link Node#run()} method writes into that
+   * sink.
+   *
+   * @param rel Relational expression
+   * @param rowEnumerable Contents of relational expression
+   */
+  void enumerable(RelNode rel, Enumerable<Row> rowEnumerable);
+
+  DataContext getDataContext();
+
+  Context createContext();
+
+}
+
+// End Compiler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
index 7d4ab3d..4f3fb7f 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
@@ -28,12 +28,12 @@ public class FilterNode extends AbstractSingleNode<Filter> {
   private final Scalar condition;
   private final Context context;
 
-  public FilterNode(Interpreter interpreter, Filter rel) {
-    super(interpreter, rel);
+  public FilterNode(Compiler compiler, Filter rel) {
+    super(compiler, rel);
     this.condition =
-        interpreter.compile(ImmutableList.of(rel.getCondition()),
+        compiler.compile(ImmutableList.of(rel.getCondition()),
             rel.getRowType());
-    this.context = interpreter.createContext();
+    this.context = compiler.createContext();
   }
 
   public void run() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
index 340299e..42276ac 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
@@ -35,17 +35,17 @@ public interface InterpretableRel extends RelNode {
   /** Context when a {@link RelNode} is being converted to an interpreter
    * {@link Node}. */
   class InterpreterImplementor {
-    public final Interpreter interpreter;
+    public final Compiler compiler;
     public final Map<String, Object> internalParameters =
         Maps.newLinkedHashMap();
     public final CalcitePrepare.SparkHandler spark;
     public final DataContext dataContext;
     public final Map<RelNode, List<Sink>> relSinks = Maps.newHashMap();
 
-    public InterpreterImplementor(Interpreter interpreter,
+    public InterpreterImplementor(Compiler compiler,
         CalcitePrepare.SparkHandler spark,
         DataContext dataContext) {
-      this.interpreter = interpreter;
+      this.compiler = compiler;
       this.spark = spark;
       this.dataContext = dataContext;
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index 9360294..6096630 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -21,7 +21,9 @@ import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.linq4j.TransformedEnumerator;
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
@@ -38,16 +40,27 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.ReflectUtil;
 import org.apache.calcite.util.ReflectiveVisitDispatcher;
 import org.apache.calcite.util.ReflectiveVisitor;
+import org.apache.calcite.util.Util;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 import java.math.BigDecimal;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -62,20 +75,19 @@ import java.util.NoSuchElementException;
  */
 public class Interpreter extends AbstractEnumerable<Object[]>
     implements AutoCloseable {
-  final Map<RelNode, NodeInfo> nodes = Maps.newLinkedHashMap();
+  private final Map<RelNode, NodeInfo> nodes;
   private final DataContext dataContext;
   private final RelNode rootRel;
-  private final Map<RelNode, List<RelNode>> relInputs = Maps.newHashMap();
-  protected final ScalarCompiler scalarCompiler;
 
   /** Creates an Interpreter. */
   public Interpreter(DataContext dataContext, RelNode rootRel) {
     this.dataContext = Preconditions.checkNotNull(dataContext);
-    this.scalarCompiler =
-        new JaninoRexCompiler(rootRel.getCluster().getRexBuilder());
     final RelNode rel = optimize(rootRel);
-    final Compiler compiler = new Nodes.CoreCompiler(this);
-    this.rootRel = compiler.visitRoot(rel);
+    final CompilerImpl compiler =
+        new Nodes.CoreCompiler(this, rootRel.getCluster());
+    Pair<RelNode, Map<RelNode, NodeInfo>> pair = compiler.visitRoot(rel);
+    this.rootRel = pair.left;
+    this.nodes = ImmutableMap.copyOf(pair.right);
   }
 
   private RelNode optimize(RelNode rootRel) {
@@ -98,7 +110,8 @@ public class Interpreter extends AbstractEnumerable<Object[]>
     if (nodeInfo.rowEnumerable != null) {
       rows = nodeInfo.rowEnumerable.enumerator();
     } else {
-      final ArrayDeque<Row> queue = ((ListSink) nodeInfo.sink).list;
+      final ArrayDeque<Row> queue =
+          Iterables.getOnlyElement(nodeInfo.sinks.values()).list;
       rows = Linq4j.iterableEnumerator(queue);
     }
 
@@ -124,23 +137,6 @@ public class Interpreter extends AbstractEnumerable<Object[]>
   public void close() {
   }
 
-  /** Compiles an expression to an executable form. */
-  public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) {
-    if (inputRowType == null) {
-      inputRowType = dataContext.getTypeFactory().builder().build();
-    }
-    return scalarCompiler.compile(nodes, inputRowType);
-  }
-
-  RelDataType combinedRowType(List<RelNode> inputs) {
-    final RelDataTypeFactory.Builder builder =
-        dataContext.getTypeFactory().builder();
-    for (RelNode input : inputs) {
-      builder.addAll(input.getRowType().getFieldList());
-    }
-    return builder.build();
-  }
-
   /** Not used. */
   private class FooCompiler implements ScalarCompiler {
     public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) {
@@ -249,85 +245,16 @@ public class Interpreter extends AbstractEnumerable<Object[]>
     }
   }
 
-  public Source source(RelNode rel, int ordinal) {
-    final RelNode input = getInput(rel, ordinal);
-    final NodeInfo nodeInfo = nodes.get(input);
-    if (nodeInfo == null) {
-      throw new AssertionError("should be registered: " + rel);
-    }
-    if (nodeInfo.rowEnumerable != null) {
-      return new EnumeratorSource(nodeInfo.rowEnumerable.enumerator());
-    }
-    Sink sink = nodeInfo.sink;
-    if (sink instanceof ListSink) {
-      return new ListSource((ListSink) nodeInfo.sink);
-    }
-    throw new IllegalStateException(
-      "Got a sink " + sink + " to which there is no match source type!");
-  }
-
-  private RelNode getInput(RelNode rel, int ordinal) {
-    final List<RelNode> inputs = relInputs.get(rel);
-    if (inputs != null) {
-      return inputs.get(ordinal);
-    }
-    return rel.getInput(ordinal);
-  }
-
-  /**
-   * Creates a Sink for a relational expression to write into.
-   *
-   * <p>This method is generally called from the constructor of a {@link Node}.
-   * But a constructor could instead call
-   * {@link #enumerable(RelNode, Enumerable)}.
-   *
-   * @param rel Relational expression
-   * @return Sink
-   */
-  public Sink sink(RelNode rel) {
-    final ArrayDeque<Row> queue = new ArrayDeque<>(1);
-    final Sink sink = new ListSink(queue);
-    NodeInfo nodeInfo = new NodeInfo(rel, sink, null);
-    nodes.put(rel, nodeInfo);
-    return sink;
-  }
-
-  /** Tells the interpreter that a given relational expression wishes to
-   * give its output as an enumerable.
-   *
-   * <p>This is as opposed to the norm, where a relational expression calls
-   * {@link #sink(RelNode)}, then its {@link Node#run()} method writes into that
-   * sink.
-   *
-   * @param rel Relational expression
-   * @param rowEnumerable Contents of relational expression
-   */
-  public void enumerable(RelNode rel, Enumerable<Row> rowEnumerable) {
-    NodeInfo nodeInfo = new NodeInfo(rel, null, rowEnumerable);
-    nodes.put(rel, nodeInfo);
-  }
-
-  public Context createContext() {
-    return new Context(dataContext);
-  }
-
-  public DataContext getDataContext() {
-    return dataContext;
-  }
-
   /** Information about a node registered in the data flow graph. */
   private static class NodeInfo {
     final RelNode rel;
-    final Sink sink;
+    final Map<Edge, ListSink> sinks = new LinkedHashMap<>();
     final Enumerable<Row> rowEnumerable;
     Node node;
 
-    NodeInfo(RelNode rel, Sink sink, Enumerable<Row> rowEnumerable) {
+    NodeInfo(RelNode rel, Enumerable<Row> rowEnumerable) {
       this.rel = rel;
-      this.sink = sink;
       this.rowEnumerable = rowEnumerable;
-      Preconditions.checkArgument((sink == null) != (rowEnumerable == null),
-          "one or the other");
     }
   }
 
@@ -386,15 +313,20 @@ public class Interpreter extends AbstractEnumerable<Object[]>
   /** Implementation of {@link Source} using a {@link java.util.ArrayDeque}. */
   private static class ListSource implements Source {
     private final ArrayDeque<Row> list;
+    private Iterator<Row> iterator = null;
 
-    ListSource(ListSink sink) {
-      this.list = sink.list;
+    ListSource(ArrayDeque<Row> list) {
+      this.list = list;
     }
 
     public Row receive() {
       try {
-        return list.remove();
+        if (iterator == null) {
+          iterator = list.iterator();
+        }
+        return iterator.next();
       } catch (NoSuchElementException e) {
+        iterator = null;
         return null;
       }
     }
@@ -404,6 +336,35 @@ public class Interpreter extends AbstractEnumerable<Object[]>
     }
   }
 
+  /** Implementation of {@link Sink} using a {@link java.util.ArrayDeque}. */
+  private static class DuplicatingSink implements Sink {
+    private List<ArrayDeque<Row>> queues;
+
+    private DuplicatingSink(List<ArrayDeque<Row>> queues) {
+      this.queues = ImmutableList.copyOf(queues);
+    }
+
+    public void send(Row row) throws InterruptedException {
+      for (ArrayDeque<Row> queue : queues) {
+        queue.add(row);
+      }
+    }
+
+    public void end() throws InterruptedException {
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override public void setSourceEnumerable(Enumerable<Row> enumerable)
+        throws InterruptedException {
+      // just copy over the source into the local list
+      final Enumerator<Row> enumerator = enumerable.enumerator();
+      while (enumerator.moveNext()) {
+        this.send(enumerator.current());
+      }
+      enumerator.close();
+    }
+  }
+
   /**
    * Walks over a tree of {@link org.apache.calcite.rel.RelNode} and, for each,
    * creates a {@link org.apache.calcite.interpreter.Node} that can be
@@ -417,25 +378,32 @@ public class Interpreter extends AbstractEnumerable<Object[]>
    * "visit" methods in this or a sub-class, and they will be found and called
    * via reflection.
    */
-  public static class Compiler extends RelVisitor implements ReflectiveVisitor {
-    private final ReflectiveVisitDispatcher<Compiler, RelNode> dispatcher =
-        ReflectUtil.createDispatcher(Compiler.class, RelNode.class);
+  static class CompilerImpl extends RelVisitor
+      implements Compiler, ReflectiveVisitor {
+    final ScalarCompiler scalarCompiler;
+    private final ReflectiveVisitDispatcher<CompilerImpl, RelNode> dispatcher =
+        ReflectUtil.createDispatcher(CompilerImpl.class, RelNode.class);
     protected final Interpreter interpreter;
     protected RelNode rootRel;
     protected RelNode rel;
     protected Node node;
+    final Map<RelNode, NodeInfo> nodes = new LinkedHashMap<>();
+    final Map<RelNode, List<RelNode>> relInputs = new HashMap<>();
+    final Multimap<RelNode, Edge> outEdges = LinkedHashMultimap.create();
 
     private static final String REWRITE_METHOD_NAME = "rewrite";
     private static final String VISIT_METHOD_NAME = "visit";
 
-    Compiler(Interpreter interpreter) {
+    CompilerImpl(Interpreter interpreter, RelOptCluster cluster) {
       this.interpreter = interpreter;
+      this.scalarCompiler = new JaninoRexCompiler(cluster.getRexBuilder());
     }
 
-    public RelNode visitRoot(RelNode p) {
+    /** Visits the tree, starting from the root {@code p}. */
+    Pair<RelNode, Map<RelNode, NodeInfo>> visitRoot(RelNode p) {
       rootRel = p;
       visit(p, 0, null);
-      return rootRel;
+      return Pair.of(rootRel, nodes);
     }
 
     @Override public void visit(RelNode p, int ordinal, RelNode parent) {
@@ -454,10 +422,10 @@ public class Interpreter extends AbstractEnumerable<Object[]>
         }
         p = rel;
         if (parent != null) {
-          List<RelNode> inputs = interpreter.relInputs.get(parent);
+          List<RelNode> inputs = relInputs.get(parent);
           if (inputs == null) {
             inputs = Lists.newArrayList(parent.getInputs());
-            interpreter.relInputs.put(parent, inputs);
+            relInputs.put(parent, inputs);
           }
           inputs.set(ordinal, p);
         } else {
@@ -466,7 +434,10 @@ public class Interpreter extends AbstractEnumerable<Object[]>
       }
 
       // rewrite children first (from left to right)
-      final List<RelNode> inputs = interpreter.relInputs.get(p);
+      final List<RelNode> inputs = relInputs.get(p);
+      for (Ord<RelNode> input : Ord.zip(Util.first(inputs, p.getInputs()))) {
+        outEdges.put(input.e, new Edge(p, input.i));
+      }
       if (inputs != null) {
         for (int i = 0; i < inputs.size(); i++) {
           RelNode input = inputs.get(i);
@@ -482,17 +453,22 @@ public class Interpreter extends AbstractEnumerable<Object[]>
         if (p instanceof InterpretableRel) {
           InterpretableRel interpretableRel = (InterpretableRel) p;
           node = interpretableRel.implement(
-              new InterpretableRel.InterpreterImplementor(interpreter, null,
-                  null));
+              new InterpretableRel.InterpreterImplementor(this, null, null));
         } else {
           // Probably need to add a visit(XxxRel) method to CoreCompiler.
           throw new AssertionError("interpreter: no implementation for "
               + p.getClass());
         }
       }
-      final NodeInfo nodeInfo = interpreter.nodes.get(p);
+      final NodeInfo nodeInfo = nodes.get(p);
       assert nodeInfo != null;
       nodeInfo.node = node;
+      if (inputs != null) {
+        for (int i = 0; i < inputs.size(); i++) {
+          final RelNode input = inputs.get(i);
+          visit(input, i, p);
+        }
+      }
     }
 
     /** Fallback rewrite method.
@@ -502,6 +478,95 @@ public class Interpreter extends AbstractEnumerable<Object[]>
      * rewrite. */
     public void rewrite(RelNode r) {
     }
+
+    public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) {
+      if (inputRowType == null) {
+        inputRowType = interpreter.dataContext.getTypeFactory().builder()
+            .build();
+      }
+      return scalarCompiler.compile(nodes, inputRowType);
+    }
+
+    public RelDataType combinedRowType(List<RelNode> inputs) {
+      final RelDataTypeFactory.Builder builder =
+          interpreter.dataContext.getTypeFactory().builder();
+      for (RelNode input : inputs) {
+        builder.addAll(input.getRowType().getFieldList());
+      }
+      return builder.build();
+    }
+
+    public Source source(RelNode rel, int ordinal) {
+      final RelNode input = getInput(rel, ordinal);
+      final Edge edge = new Edge(rel, ordinal);
+      final Collection<Edge> edges = outEdges.get(input);
+      final NodeInfo nodeInfo = nodes.get(input);
+      if (nodeInfo == null) {
+        throw new AssertionError("should be registered: " + rel);
+      }
+      if (nodeInfo.rowEnumerable != null) {
+        return new EnumeratorSource(nodeInfo.rowEnumerable.enumerator());
+      }
+      assert nodeInfo.sinks.size() == edges.size();
+      final ListSink sink = nodeInfo.sinks.get(edge);
+      if (sink != null) {
+        return new ListSource(sink.list);
+      }
+      throw new IllegalStateException(
+          "Got a sink " + sink + " to which there is no match source type!");
+    }
+
+    private RelNode getInput(RelNode rel, int ordinal) {
+      final List<RelNode> inputs = relInputs.get(rel);
+      if (inputs != null) {
+        return inputs.get(ordinal);
+      }
+      return rel.getInput(ordinal);
+    }
+
+    public Sink sink(RelNode rel) {
+      final Collection<Edge> edges = outEdges.get(rel);
+      final Collection<Edge> edges2 = edges.isEmpty()
+          ? ImmutableList.of(new Edge(null, 0))
+          : edges;
+      NodeInfo nodeInfo = nodes.get(rel);
+      if (nodeInfo == null) {
+        nodeInfo = new NodeInfo(rel, null);
+        nodes.put(rel, nodeInfo);
+        for (Edge edge : edges2) {
+          nodeInfo.sinks.put(edge, new ListSink(new ArrayDeque<Row>()));
+        }
+      }
+      if (edges.size() == 1) {
+        return Iterables.getOnlyElement(nodeInfo.sinks.values());
+      } else {
+        final List<ArrayDeque<Row>> queues = new ArrayList<>();
+        for (ListSink sink : nodeInfo.sinks.values()) {
+          queues.add(sink.list);
+        }
+        return new DuplicatingSink(queues);
+      }
+    }
+
+    public void enumerable(RelNode rel, Enumerable<Row> rowEnumerable) {
+      NodeInfo nodeInfo = new NodeInfo(rel, rowEnumerable);
+      nodes.put(rel, nodeInfo);
+    }
+
+    public Context createContext() {
+      return new Context(getDataContext());
+    }
+
+    public DataContext getDataContext() {
+      return interpreter.dataContext;
+    }
+  }
+
+  /** Edge between a {@link RelNode} and one of its inputs. */
+  static class Edge extends Pair<RelNode, Integer> {
+    Edge(RelNode parent, int ordinal) {
+      super(parent, ordinal);
+    }
   }
 
   /** Converts a list of expressions to a scalar that can compute their

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
index 6e2ae15..349e26f 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
@@ -35,14 +35,14 @@ public class JoinNode implements Node {
   private final Scalar condition;
   private final Context context;
 
-  public JoinNode(Interpreter interpreter, Join rel) {
-    this.leftSource = interpreter.source(rel, 0);
-    this.rightSource = interpreter.source(rel, 1);
-    this.sink = interpreter.sink(rel);
-    this.condition = interpreter.compile(ImmutableList.of(rel.getCondition()),
-        interpreter.combinedRowType(rel.getInputs()));
+  public JoinNode(Compiler compiler, Join rel) {
+    this.leftSource = compiler.source(rel, 0);
+    this.rightSource = compiler.source(rel, 1);
+    this.sink = compiler.sink(rel);
+    this.condition = compiler.compile(ImmutableList.of(rel.getCondition()),
+        compiler.combinedRowType(rel.getInputs()));
     this.rel = rel;
-    this.context = interpreter.createContext();
+    this.context = compiler.createContext();
 
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
index 19f397a..b6f1b08 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.interpreter;
 
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
@@ -35,54 +36,53 @@ import com.google.common.collect.ImmutableList;
  */
 public class Nodes {
   /** Extension to
-   * {@link org.apache.calcite.interpreter.Interpreter.Compiler}
+   * {@link Interpreter.CompilerImpl}
    * that knows how to handle the core logical
    * {@link org.apache.calcite.rel.RelNode}s. */
-  public static class CoreCompiler extends Interpreter.Compiler {
-    CoreCompiler(Interpreter interpreter) {
-      super(interpreter);
+  public static class CoreCompiler extends Interpreter.CompilerImpl {
+    CoreCompiler(Interpreter interpreter, RelOptCluster cluster) {
+      super(interpreter, cluster);
     }
 
     public void visit(Aggregate agg) {
-      node = new AggregateNode(interpreter, agg);
+      node = new AggregateNode(this, agg);
     }
 
     public void visit(Filter filter) {
-      node = new FilterNode(interpreter, filter);
+      node = new FilterNode(this, filter);
     }
 
     public void visit(Project project) {
-      node = new ProjectNode(interpreter, project);
+      node = new ProjectNode(this, project);
     }
 
     public void visit(Values value) {
-      node = new ValuesNode(interpreter, value);
+      node = new ValuesNode(this, value);
     }
 
     public void visit(TableScan scan) {
-      node = TableScanNode.create(interpreter, scan,
-          ImmutableList.<RexNode>of(), null);
+      final ImmutableList<RexNode> filters = ImmutableList.of();
+      node = TableScanNode.create(this, scan, filters, null);
     }
 
     public void visit(Bindables.BindableTableScan scan) {
-      node = TableScanNode.create(interpreter, scan, scan.filters,
-          scan.projects);
+      node = TableScanNode.create(this, scan, scan.filters, scan.projects);
     }
 
     public void visit(Sort sort) {
-      node = new SortNode(interpreter, sort);
+      node = new SortNode(this, sort);
     }
 
     public void visit(Union union) {
-      node = new UnionNode(interpreter, union);
+      node = new UnionNode(this, union);
     }
 
     public void visit(Join join) {
-      node = new JoinNode(interpreter, join);
+      node = new JoinNode(this, join);
     }
 
     public void visit(Window window) {
-      node = new WindowNode(interpreter, window);
+      node = new WindowNode(this, window);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
index 4c280b7..2503b1cd 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
@@ -27,12 +27,12 @@ public class ProjectNode extends AbstractSingleNode<Project> {
   private final Context context;
   private final int projectCount;
 
-  public ProjectNode(Interpreter interpreter, Project rel) {
-    super(interpreter, rel);
+  public ProjectNode(Compiler compiler, Project rel) {
+    super(compiler, rel);
     this.projectCount = rel.getProjects().size();
-    this.scalar = interpreter.compile(rel.getProjects(),
+    this.scalar = compiler.compile(rel.getProjects(),
         rel.getInput().getRowType());
-    this.context = interpreter.createContext();
+    this.context = compiler.createContext();
   }
 
   public void run() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
index 5eaaaf6..dff97a6 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
@@ -34,8 +34,8 @@ import java.util.List;
  * {@link org.apache.calcite.rel.core.Sort}.
  */
 public class SortNode extends AbstractSingleNode<Sort> {
-  public SortNode(Interpreter interpreter, Sort rel) {
-    super(interpreter, rel);
+  public SortNode(Compiler compiler, Sort rel) {
+    super(compiler, rel);
   }
 
   public void run() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
index 9cec9e6..9c8124e 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
@@ -59,12 +59,12 @@ import static org.apache.calcite.util.Static.RESOURCE;
  * {@link org.apache.calcite.rel.core.TableScan}.
  */
 public class TableScanNode implements Node {
-  private TableScanNode(Interpreter interpreter, TableScan rel,
+  private TableScanNode(Compiler compiler, TableScan rel,
       Enumerable<Row> enumerable) {
-    interpreter.enumerable(rel, enumerable);
+    compiler.enumerable(rel, enumerable);
   }
 
-  public void run() throws InterruptedException {
+  public void run() {
     // nothing to do
   }
 
@@ -73,56 +73,56 @@ public class TableScanNode implements Node {
    * <p>Tries various table SPIs, and negotiates with the table which filters
    * and projects it can implement. Adds to the Enumerable implementations of
    * any filters and projects that cannot be implemented by the table. */
-  static TableScanNode create(Interpreter interpreter, TableScan rel,
+  static TableScanNode create(Compiler compiler, TableScan rel,
       ImmutableList<RexNode> filters, ImmutableIntList projects) {
     final RelOptTable relOptTable = rel.getTable();
     final ProjectableFilterableTable pfTable =
         relOptTable.unwrap(ProjectableFilterableTable.class);
     if (pfTable != null) {
-      return createProjectableFilterable(interpreter, rel, filters, projects,
+      return createProjectableFilterable(compiler, rel, filters, projects,
           pfTable);
     }
     final FilterableTable filterableTable =
         relOptTable.unwrap(FilterableTable.class);
     if (filterableTable != null) {
-      return createFilterable(interpreter, rel, filters, projects,
+      return createFilterable(compiler, rel, filters, projects,
           filterableTable);
     }
     final ScannableTable scannableTable =
         relOptTable.unwrap(ScannableTable.class);
     if (scannableTable != null) {
-      return createScannable(interpreter, rel, filters, projects,
+      return createScannable(compiler, rel, filters, projects,
           scannableTable);
     }
     //noinspection unchecked
     final Enumerable<Row> enumerable = relOptTable.unwrap(Enumerable.class);
     if (enumerable != null) {
-      return createEnumerable(interpreter, rel, enumerable, null, filters,
+      return createEnumerable(compiler, rel, enumerable, null, filters,
           projects);
     }
     final QueryableTable queryableTable =
         relOptTable.unwrap(QueryableTable.class);
     if (queryableTable != null) {
-      return createQueryable(interpreter, rel, filters, projects,
+      return createQueryable(compiler, rel, filters, projects,
           queryableTable);
     }
     throw new AssertionError("cannot convert table " + relOptTable
         + " to enumerable");
   }
 
-  private static TableScanNode createScannable(Interpreter interpreter,
-      TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects,
+  private static TableScanNode createScannable(Compiler compiler, TableScan rel,
+      ImmutableList<RexNode> filters, ImmutableIntList projects,
       ScannableTable scannableTable) {
     final Enumerable<Row> rowEnumerable =
-        Enumerables.toRow(scannableTable.scan(interpreter.getDataContext()));
-    return createEnumerable(interpreter, rel, rowEnumerable, null, filters,
+        Enumerables.toRow(scannableTable.scan(compiler.getDataContext()));
+    return createEnumerable(compiler, rel, rowEnumerable, null, filters,
         projects);
   }
 
-  private static TableScanNode createQueryable(Interpreter interpreter,
+  private static TableScanNode createQueryable(Compiler compiler,
       TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects,
       QueryableTable queryableTable) {
-    final DataContext root = interpreter.getDataContext();
+    final DataContext root = compiler.getDataContext();
     final RelOptTable relOptTable = rel.getTable();
     final Type elementType = queryableTable.getElementType();
     SchemaPlus schema = root.getRootSchema();
@@ -163,14 +163,14 @@ public class TableScanNode implements Node {
       rowEnumerable =
           Schemas.queryable(root, Row.class, relOptTable.getQualifiedName());
     }
-    return createEnumerable(interpreter, rel, rowEnumerable, null, filters,
+    return createEnumerable(compiler, rel, rowEnumerable, null, filters,
         projects);
   }
 
-  private static TableScanNode createFilterable(Interpreter interpreter,
+  private static TableScanNode createFilterable(Compiler compiler,
       TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects,
       FilterableTable filterableTable) {
-    final DataContext root = interpreter.getDataContext();
+    final DataContext root = compiler.getDataContext();
     final List<RexNode> mutableFilters = Lists.newArrayList(filters);
     final Enumerable<Object[]> enumerable =
         filterableTable.scan(root, mutableFilters);
@@ -180,14 +180,14 @@ public class TableScanNode implements Node {
       }
     }
     final Enumerable<Row> rowEnumerable = Enumerables.toRow(enumerable);
-    return createEnumerable(interpreter, rel, rowEnumerable, null,
+    return createEnumerable(compiler, rel, rowEnumerable, null,
         mutableFilters, projects);
   }
 
-  private static TableScanNode createProjectableFilterable(
-      Interpreter interpreter, TableScan rel, ImmutableList<RexNode> filters,
-      ImmutableIntList projects, ProjectableFilterableTable pfTable) {
-    final DataContext root = interpreter.getDataContext();
+  private static TableScanNode createProjectableFilterable(Compiler compiler,
+      TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects,
+      ProjectableFilterableTable pfTable) {
+    final DataContext root = compiler.getDataContext();
     final ImmutableIntList originalProjects = projects;
     for (;;) {
       final List<RexNode> mutableFilters = Lists.newArrayList(filters);
@@ -234,19 +234,20 @@ public class TableScanNode implements Node {
         // project the leading columns.
         rejectedProjects = ImmutableIntList.identity(originalProjects.size());
       }
-      return createEnumerable(interpreter, rel, rowEnumerable, projects,
+      return createEnumerable(compiler, rel, rowEnumerable, projects,
           mutableFilters, rejectedProjects);
     }
   }
 
-  private static TableScanNode createEnumerable(
-      Interpreter interpreter, TableScan rel,
-      Enumerable<Row> enumerable, final ImmutableIntList acceptedProjects,
-      List<RexNode> rejectedFilters, final ImmutableIntList rejectedProjects) {
+  private static TableScanNode createEnumerable(Compiler compiler,
+      TableScan rel, Enumerable<Row> enumerable,
+      final ImmutableIntList acceptedProjects, List<RexNode> rejectedFilters,
+      final ImmutableIntList rejectedProjects) {
     if (!rejectedFilters.isEmpty()) {
       final RexNode filter =
           RexUtil.composeConjunction(rel.getCluster().getRexBuilder(),
               rejectedFilters, false);
+      assert filter != null;
       // Re-map filter for the projects that have been applied already
       final RexNode filter2;
       final RelDataType inputRowType;
@@ -267,9 +268,8 @@ public class TableScanNode implements Node {
         inputRowType = builder.build();
       }
       final Scalar condition =
-          interpreter.compile(
-              ImmutableList.of(filter2), inputRowType);
-      final Context context = interpreter.createContext();
+          compiler.compile(ImmutableList.of(filter2), inputRowType);
+      final Context context = compiler.createContext();
       enumerable = enumerable.where(
           new Predicate1<Row>() {
             @Override public boolean apply(Row row) {
@@ -292,7 +292,7 @@ public class TableScanNode implements Node {
             }
           });
     }
-    return new TableScanNode(interpreter, rel, enumerable);
+    return new TableScanNode(compiler, rel, enumerable);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
index 701dbc7..0f6fe4f 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
@@ -32,13 +32,13 @@ public class UnionNode implements Node {
   private final Sink sink;
   private final Union rel;
 
-  public UnionNode(Interpreter interpreter, Union rel) {
+  public UnionNode(Compiler compiler, Union rel) {
     ImmutableList.Builder<Source> builder = ImmutableList.builder();
     for (int i = 0; i < rel.getInputs().size(); i++) {
-      builder.add(interpreter.source(rel, i));
+      builder.add(compiler.source(rel, i));
     }
     this.sources = builder.build();
-    this.sink = interpreter.sink(rel);
+    this.sink = compiler.sink(rel);
     this.rel = rel;
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
index 008d6f0..1c6f255 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
@@ -34,21 +34,21 @@ public class ValuesNode implements Node {
   private final int fieldCount;
   private final ImmutableList<Row> rows;
 
-  public ValuesNode(Interpreter interpreter, Values rel) {
-    this.sink = interpreter.sink(rel);
+  public ValuesNode(Compiler compiler, Values rel) {
+    this.sink = compiler.sink(rel);
     this.fieldCount = rel.getRowType().getFieldCount();
-    this.rows = createRows(interpreter, rel.getTuples());
+    this.rows = createRows(compiler, rel.getTuples());
   }
 
-  private ImmutableList<Row> createRows(Interpreter interpreter,
+  private ImmutableList<Row> createRows(Compiler compiler,
       ImmutableList<ImmutableList<RexLiteral>> tuples) {
     final List<RexNode> nodes = Lists.newArrayList();
     for (ImmutableList<RexLiteral> tuple : tuples) {
       nodes.addAll(tuple);
     }
-    final Scalar scalar = interpreter.compile(nodes, null);
+    final Scalar scalar = compiler.compile(nodes, null);
     final Object[] values = new Object[nodes.size()];
-    final Context context = interpreter.createContext();
+    final Context context = compiler.createContext();
     scalar.execute(context, values);
     final ImmutableList.Builder<Row> rows = ImmutableList.builder();
     Object[] subValues = new Object[fieldCount];

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
index a03608c..eff4bea 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
@@ -23,8 +23,8 @@ import org.apache.calcite.rel.core.Window;
  * {@link org.apache.calcite.rel.core.Window}.
  */
 public class WindowNode extends AbstractSingleNode<Window> {
-  WindowNode(Interpreter interpreter, Window rel) {
-    super(interpreter, rel);
+  WindowNode(Compiler compiler, Window rel) {
+    super(compiler, rel);
   }
 
   public void run() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 2469841..8d3ea64 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -426,7 +426,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
   }
 
   @Override public Node implement(InterpreterImplementor implementor) {
-    return new DruidQueryNode(implementor.interpreter, this);
+    return new DruidQueryNode(implementor.compiler, this);
   }
 
   public QuerySpec getQuerySpec() {


[2/2] calcite git commit: [CALCITE-2116] In HepPlanner, ensure that common sub-expressions have the same digest (LeoWangLZ)

Posted by jh...@apache.org.
[CALCITE-2116] In HepPlanner, ensure that common sub-expressions have the same digest (LeoWangLZ)

Close apache/calcite#597


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/c0f912dd
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/c0f912dd
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/c0f912dd

Branch: refs/heads/master
Commit: c0f912ddff717eaedb9a2d652e95f232d10e207f
Parents: 7b85d44
Author: LeoWangLZ <wa...@163.com>
Authored: Fri Jan 5 10:15:24 2018 +0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Jan 9 23:01:29 2018 -0800

----------------------------------------------------------------------
 .../org/apache/calcite/plan/hep/HepPlanner.java |  5 +++-
 .../org/apache/calcite/test/HepPlannerTest.java | 25 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/c0f912dd/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java b/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java
index 657460a..d43bbd8 100644
--- a/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java
@@ -811,6 +811,9 @@ public class HepPlanner extends AbstractRelOptPlanner {
       rel = rel.copy(rel.getTraitSet(), newInputs);
       onCopy(oldRel, rel);
     }
+    // Compute digest first time we add to DAG,
+    // otherwise can't get equivVertex for common sub-expression
+    rel.recomputeDigest();
 
     // try to find equivalent rel only if DAG is allowed
     if (!noDAG) {
@@ -886,7 +889,7 @@ public class HepPlanner extends AbstractRelOptPlanner {
     if (mapDigestToVertex.get(oldDigest) == vertex) {
       mapDigestToVertex.remove(oldDigest);
     }
-    String newDigest = rel.recomputeDigest();
+    String newDigest = rel.getDigest();
     // When a transformation happened in one rule apply, support
     // vertex2 replace vertex1, but the current relNode of
     // vertex1 and vertex2 is same,

http://git-wip-us.apache.org/repos/asf/calcite/blob/c0f912dd/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java b/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java
index e3e144b..7a08d0c 100644
--- a/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/test/HepPlannerTest.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalUnion;
@@ -248,6 +249,30 @@ public class HepPlannerTest extends RelOptTestBase {
             + " (select * from dept) d2");
   }
 
+  /** Tests that if two relational expressions are equivalent, the planner
+   * notices, and only applies the rule once. */
+  @Test public void testCommonSubExpression() {
+    // In the following,
+    //   (select 1 from dept where abs(-1)=20)
+    // occurs twice, but it's a common sub-expression, so the rule should only
+    // apply once.
+    HepProgramBuilder programBuilder = HepProgram.builder();
+    programBuilder.addRuleInstance(FilterToCalcRule.INSTANCE);
+
+    final HepTestListener listener = new HepTestListener(0);
+    HepPlanner planner = new HepPlanner(programBuilder.build());
+    planner.addListener(listener);
+
+    final String sql = "(select 1 from dept where abs(-1)=20)\n"
+        + "union all\n"
+        + "(select 1 from dept where abs(-1)=20)";
+    planner.setRoot(tester.convertSqlToRel(sql).rel);
+    RelNode bestRel = planner.findBestExp();
+
+    assertThat(bestRel.getInput(0).equals(bestRel.getInput(1)), is(true));
+    assertThat(listener.getApplyTimes() == 1, is(true));
+  }
+
   @Test public void testSubprogram() throws Exception {
     // Verify that subprogram gets re-executed until fixpoint.
     // In this case, the first time through we limit it to generate