You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/01/22 01:33:39 UTC

[2/3] incubator-calcite git commit: [CALCITE-544] Implement Union in Interpreter; [CALCITE-562] Implement inner JOIN in interpreter and improve handling of scalar expressions

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
new file mode 100644
index 0000000..cfe9b37
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.enumerable.AggImplementor;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Utilities pertaining to {@link BindableRel} and {@link BindableConvention}.
+ */
+public class Bindables {
+  private Bindables() {}
+
+  public static final RelOptRule BINDABLE_TABLE_RULE =
+      new BindableTableScanRule();
+
+  public static final RelOptRule BINDABLE_FILTER_RULE =
+      new BindableFilterRule();
+
+  public static final RelOptRule BINDABLE_PROJECT_RULE =
+      new BindableProjectRule();
+
+  public static final RelOptRule BINDABLE_SORT_RULE =
+      new BindableSortRule();
+
+  public static final RelOptRule BINDABLE_JOIN_RULE =
+      new BindableJoinRule();
+
+  public static final RelOptRule BINDABLE_UNION_RULE =
+      new BindableUnionRule();
+
+  public static final RelOptRule BINDABLE_VALUES_RULE =
+      new BindableValuesRule();
+
+  public static final RelOptRule BINDABLE_AGGREGATE_RULE =
+      new BindableAggregateRule();
+
+  public static final RelOptRule BINDABLE_WINDOW_RULE =
+      new BindableWindowRule();
+
+  /** All rules that convert logical relational expression to bindable. */
+  public static final ImmutableList<RelOptRule> RULES =
+      ImmutableList.of(
+          NoneToBindableConverterRule.INSTANCE,
+          BINDABLE_TABLE_RULE,
+          BINDABLE_FILTER_RULE,
+          BINDABLE_PROJECT_RULE,
+          BINDABLE_SORT_RULE,
+          BINDABLE_JOIN_RULE,
+          BINDABLE_UNION_RULE,
+          BINDABLE_VALUES_RULE,
+          BINDABLE_AGGREGATE_RULE,
+          BINDABLE_WINDOW_RULE);
+
+  /** Helper method that converts a bindable relational expression into a
+   * record iterator.
+   *
+   * <p>Any bindable can be compiled; if its input is also bindable, it becomes
+   * part of the same compilation unit.
+   */
+  private static Enumerable<Object[]> help(DataContext dataContext,
+      BindableRel rel) {
+    return new Interpreter(dataContext, rel);
+  }
+
+  /** Rule that converts a {@link ScannableTable} to bindable convention. */
+  private static class BindableTableScanRule extends RelOptRule {
+    public BindableTableScanRule() {
+      super(operand(TableScan.class, none()));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final TableScan scan = call.rel(0);
+      call.transformTo(
+          new BindableTableScan(scan.getCluster(),
+              scan.getTraitSet().replace(BindableConvention.INSTANCE),
+              scan.getTable()));
+    }
+  }
+
+  /** Scan of a table that implements {@link ScannableTable} and therefore can
+   * be converted into an {@link Enumerable}. */
+  private static class BindableTableScan
+      extends TableScan implements BindableRel {
+    BindableTableScan(RelOptCluster cluster, RelTraitSet traits,
+        RelOptTable table) {
+      super(cluster, traits, table);
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return table.unwrap(ScannableTable.class).scan(dataContext);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      throw new UnsupportedOperationException(); // TODO:
+    }
+  }
+
+  /** Rule that converts a {@link Filter} to bindable convention. */
+  private static class BindableFilterRule extends ConverterRule {
+    private BindableFilterRule() {
+      super(LogicalFilter.class, RelOptUtil.FILTER_PREDICATE, Convention.NONE,
+          BindableConvention.INSTANCE, "BindableFilterRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalFilter filter = (LogicalFilter) rel;
+      return new BindableFilter(rel.getCluster(),
+          rel.getTraitSet().replace(BindableConvention.INSTANCE),
+          convert(filter.getInput(),
+              filter.getInput().getTraitSet()
+                  .replace(BindableConvention.INSTANCE)),
+          filter.getCondition());
+    }
+  }
+
+  /** Implementation of {@link org.apache.calcite.rel.core.Filter}
+   * in bindable convention. */
+  public static class BindableFilter extends Filter implements BindableRel {
+    public BindableFilter(RelOptCluster cluster, RelTraitSet traitSet,
+        RelNode child, RexNode condition) {
+      super(cluster, traitSet, child, condition);
+      assert getConvention() instanceof BindableConvention;
+    }
+
+    public BindableFilter copy(RelTraitSet traitSet, RelNode input,
+        RexNode condition) {
+      return new BindableFilter(getCluster(), traitSet, input, condition);
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return help(dataContext, this);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      return new FilterNode(implementor.interpreter, this);
+    }
+  }
+
+  /**
+   * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
+   * to a {@link BindableProject}.
+   */
+  private static class BindableProjectRule extends ConverterRule {
+    BindableProjectRule() {
+      super(LogicalProject.class, RelOptUtil.PROJECT_PREDICATE, Convention.NONE,
+          BindableConvention.INSTANCE, "BindableProjectRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalProject project = (LogicalProject) rel;
+      return new BindableProject(rel.getCluster(),
+          rel.getTraitSet().replace(BindableConvention.INSTANCE),
+          convert(project.getInput(),
+              project.getInput().getTraitSet()
+                  .replace(BindableConvention.INSTANCE)),
+          project.getProjects(),
+          project.getRowType(),
+          Project.Flags.BOXED);
+    }
+  }
+
+  /** Implementation of {@link org.apache.calcite.rel.core.Project} in
+   * bindable calling convention. */
+  public static class BindableProject extends Project implements BindableRel {
+    public BindableProject(RelOptCluster cluster, RelTraitSet traitSet,
+        RelNode child, List<? extends RexNode> exps, RelDataType rowType,
+        int flags) {
+      super(cluster, traitSet, child, exps, rowType, flags);
+      assert getConvention() instanceof BindableConvention;
+    }
+
+    public BindableProject copy(RelTraitSet traitSet, RelNode input,
+        List<RexNode> exps, RelDataType rowType) {
+      return new BindableProject(getCluster(), traitSet, input, exps, rowType,
+          flags);
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return help(dataContext, this);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      return new ProjectNode(implementor.interpreter, this);
+    }
+  }
+
+  /**
+   * Rule to convert an {@link org.apache.calcite.rel.core.Sort} to a
+   * {@link org.apache.calcite.interpreter.Bindables.BindableSort}.
+   */
+  private static class BindableSortRule extends ConverterRule {
+    BindableSortRule() {
+      super(Sort.class, Convention.NONE, BindableConvention.INSTANCE,
+          "BindableSortRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final Sort sort = (Sort) rel;
+      final RelTraitSet traitSet =
+          sort.getTraitSet().replace(BindableConvention.INSTANCE);
+      final RelNode input = sort.getInput();
+      return new BindableSort(rel.getCluster(), traitSet,
+          convert(input,
+              input.getTraitSet().replace(BindableConvention.INSTANCE)),
+          sort.getCollation(), sort.offset, sort.fetch);
+    }
+  }
+
+  /** Implementation of {@link org.apache.calcite.rel.core.Sort}
+   * bindable calling convention. */
+  public static class BindableSort extends Sort implements BindableRel {
+    public BindableSort(RelOptCluster cluster, RelTraitSet traitSet,
+        RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+      super(cluster, traitSet, child, collation, offset, fetch);
+      assert getConvention() instanceof BindableConvention;
+      assert getConvention() == child.getConvention();
+    }
+
+    @Override public BindableSort copy(RelTraitSet traitSet, RelNode newInput,
+        RelCollation newCollation, RexNode offset, RexNode fetch) {
+      return new BindableSort(getCluster(), traitSet, newInput, newCollation,
+          offset, fetch);
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return help(dataContext, this);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      return new SortNode(implementor.interpreter, this);
+    }
+  }
+
+  /**
+   * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalJoin}
+   * to a {@link BindableJoin}.
+   */
+  private static class BindableJoinRule extends ConverterRule {
+    BindableJoinRule() {
+      super(LogicalJoin.class, Convention.NONE, BindableConvention.INSTANCE,
+          "BindableJoinRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalJoin join = (LogicalJoin) rel;
+      final BindableConvention out = BindableConvention.INSTANCE;
+      final RelTraitSet traitSet = join.getTraitSet().replace(out);
+      return new BindableJoin(rel.getCluster(), traitSet,
+          convert(join.getLeft(),
+              join.getLeft().getTraitSet()
+                  .replace(BindableConvention.INSTANCE)),
+          convert(join.getRight(),
+              join.getRight().getTraitSet()
+                  .replace(BindableConvention.INSTANCE)),
+          join.getCondition(), join.getJoinType(), join.getVariablesStopped());
+    }
+  }
+
+  /** Implementation of {@link org.apache.calcite.rel.core.Join} in
+   * bindable calling convention. */
+  public static class BindableJoin extends Join implements BindableRel {
+    protected BindableJoin(RelOptCluster cluster, RelTraitSet traits,
+        RelNode left, RelNode right, RexNode condition, JoinRelType joinType,
+        Set<String> variablesStopped) {
+      super(cluster, traits, left, right, condition, joinType,
+          variablesStopped);
+    }
+
+    public BindableJoin copy(RelTraitSet traitSet, RexNode conditionExpr,
+        RelNode left, RelNode right, JoinRelType joinType,
+        boolean semiJoinDone) {
+      return new BindableJoin(getCluster(), traitSet, left, right,
+          conditionExpr, joinType, variablesStopped);
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return help(dataContext, this);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      return new JoinNode(implementor.interpreter, this);
+    }
+  }
+
+  /**
+   * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalUnion}
+   * to a {@link BindableUnion}.
+   */
+  private static class BindableUnionRule extends ConverterRule {
+    BindableUnionRule() {
+      super(LogicalUnion.class, Convention.NONE, BindableConvention.INSTANCE,
+          "BindableUnionRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalUnion union = (LogicalUnion) rel;
+      final BindableConvention out = BindableConvention.INSTANCE;
+      final RelTraitSet traitSet = union.getTraitSet().replace(out);
+      return new BindableUnion(rel.getCluster(), traitSet,
+          convertList(union.getInputs(), out), union.all);
+    }
+  }
+
+  /** Implementation of {@link org.apache.calcite.rel.core.Union} in
+   * bindable calling convention. */
+  public static class BindableUnion extends Union implements BindableRel {
+    public BindableUnion(RelOptCluster cluster, RelTraitSet traitSet,
+        List<RelNode> inputs, boolean all) {
+      super(cluster, traitSet, inputs, all);
+    }
+
+    public BindableUnion copy(RelTraitSet traitSet, List<RelNode> inputs,
+        boolean all) {
+      return new BindableUnion(getCluster(), traitSet, inputs, all);
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return help(dataContext, this);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      return new UnionNode(implementor.interpreter, this);
+    }
+  }
+
+  /** Implementation of {@link org.apache.calcite.rel.core.Values}
+   * in bindable calling convention. */
+  public static class BindableValues extends Values implements BindableRel {
+    BindableValues(RelOptCluster cluster, RelDataType rowType,
+        ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traitSet) {
+      super(cluster, rowType, tuples, traitSet);
+    }
+
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+      assert inputs.isEmpty();
+      return new BindableValues(getCluster(), rowType, tuples, traitSet);
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return help(dataContext, this);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      return new ValuesNode(implementor.interpreter, this);
+    }
+  }
+
+  /** Rule that converts a {@link Values} to bindable convention. */
+  private static class BindableValuesRule extends ConverterRule {
+    BindableValuesRule() {
+      super(LogicalValues.class, Convention.NONE, BindableConvention.INSTANCE,
+          "BindableValuesRule");
+    }
+
+    @Override public RelNode convert(RelNode rel) {
+      LogicalValues values = (LogicalValues) rel;
+      return new BindableValues(values.getCluster(), values.getRowType(),
+          values.getTuples(),
+          values.getTraitSet().replace(BindableConvention.INSTANCE));
+    }
+  }
+
+  /** Implementation of {@link org.apache.calcite.rel.core.Aggregate}
+   * in bindable calling convention. */
+  public static class BindableAggregate extends Aggregate
+      implements BindableRel {
+    public BindableAggregate(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode child,
+        boolean indicator,
+        ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets,
+        List<AggregateCall> aggCalls)
+        throws InvalidRelException {
+      super(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls);
+      assert getConvention() instanceof BindableConvention;
+
+      for (AggregateCall aggCall : aggCalls) {
+        if (aggCall.isDistinct()) {
+          throw new InvalidRelException(
+              "distinct aggregation not supported");
+        }
+        AggImplementor implementor2 =
+            RexImpTable.INSTANCE.get(aggCall.getAggregation(), false);
+        if (implementor2 == null) {
+          throw new InvalidRelException(
+              "aggregation " + aggCall.getAggregation() + " not supported");
+        }
+      }
+    }
+
+    @Override public BindableAggregate copy(RelTraitSet traitSet, RelNode input,
+        boolean indicator, ImmutableBitSet groupSet,
+        List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+      try {
+        return new BindableAggregate(getCluster(), traitSet, input, indicator,
+            groupSet, groupSets, aggCalls);
+      } catch (InvalidRelException e) {
+        // Semantic error not possible. Must be a bug. Convert to
+        // internal error.
+        throw new AssertionError(e);
+      }
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return help(dataContext, this);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      return new AggregateNode(implementor.interpreter, this);
+    }
+  }
+
+  /** Rule that converts an {@link Aggregate} to bindable convention. */
+  private static class BindableAggregateRule extends ConverterRule {
+    BindableAggregateRule() {
+      super(LogicalAggregate.class, Convention.NONE,
+          BindableConvention.INSTANCE, "BindableAggregateRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalAggregate agg = (LogicalAggregate) rel;
+      final RelTraitSet traitSet =
+          agg.getTraitSet().replace(BindableConvention.INSTANCE);
+      try {
+        return new BindableAggregate(rel.getCluster(), traitSet,
+            convert(agg.getInput(), traitSet), agg.indicator, agg.getGroupSet(),
+            agg.getGroupSets(), agg.getAggCallList());
+      } catch (InvalidRelException e) {
+        RelOptPlanner.LOGGER.fine(e.toString());
+        return null;
+      }
+    }
+  }
+
+  /** Implementation of {@link org.apache.calcite.rel.core.Window}
+   * in bindable convention. */
+  public static class BindableWindow extends Window implements BindableRel {
+    /** Creates an BindableWindowRel. */
+    BindableWindow(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+        List<RexLiteral> constants, RelDataType rowType, List<Group> groups) {
+      super(cluster, traits, child, constants, rowType, groups);
+    }
+
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+      return new BindableWindow(getCluster(), traitSet, sole(inputs),
+          constants, rowType, groups);
+    }
+
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner) {
+      return super.computeSelfCost(planner)
+          .multiplyBy(BindableConvention.COST_MULTIPLIER);
+    }
+
+    public Class<Object[]> getElementType() {
+      return Object[].class;
+    }
+
+    public Enumerable<Object[]> bind(DataContext dataContext) {
+      return help(dataContext, this);
+    }
+
+    public Node implement(InterpreterImplementor implementor) {
+      return new WindowNode(implementor.interpreter, this);
+    }
+  }
+
+  /**
+   * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalWindow}
+   * to a {@link BindableWindow}.
+   */
+  private static class BindableWindowRule extends ConverterRule {
+    BindableWindowRule() {
+      super(LogicalWindow.class, Convention.NONE, BindableConvention.INSTANCE,
+          "BindableWindowRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalWindow winAgg = (LogicalWindow) rel;
+      final RelTraitSet traitSet =
+          winAgg.getTraitSet().replace(BindableConvention.INSTANCE);
+      final RelNode child = winAgg.getInput();
+      final RelNode convertedChild =
+          convert(child,
+              child.getTraitSet().replace(BindableConvention.INSTANCE));
+      return new BindableWindow(rel.getCluster(), traitSet, convertedChild,
+          winAgg.getConstants(), winAgg.getRowType(), winAgg.groups);
+    }
+  }
+}
+
+// End Bindables.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Context.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Context.java b/core/src/main/java/org/apache/calcite/interpreter/Context.java
index fc80319..f125454 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Context.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Context.java
@@ -16,12 +16,20 @@
  */
 package org.apache.calcite.interpreter;
 
+import org.apache.calcite.DataContext;
+
 /**
  * Context for executing a scalar expression in an interpreter.
  */
 public class Context {
+  public final DataContext root;
+
   /** Values of incoming columns from all inputs. */
   public Object[] values;
+
+  Context(DataContext root) {
+    this.root = root;
+  }
 }
 
 // End Context.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
index d5461c4..e06e12d 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
@@ -18,20 +18,21 @@ package org.apache.calcite.interpreter;
 
 import org.apache.calcite.rel.core.Filter;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * Interpreter node that implements a
  * {@link org.apache.calcite.rel.core.Filter}.
  */
-public class FilterNode implements Node {
+public class FilterNode extends AbstractSingleNode<Filter> {
   private final Scalar condition;
-  private final Source source;
-  private final Sink sink;
   private final Context context;
 
   public FilterNode(Interpreter interpreter, Filter rel) {
-    this.condition = interpreter.compile(rel.getCondition());
-    this.source = interpreter.source(rel, 0);
-    this.sink = interpreter.sink(rel);
+    super(interpreter, rel);
+    this.condition =
+        interpreter.compile(ImmutableList.of(rel.getCondition()),
+            rel.getInputs());
     this.context = interpreter.createContext();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/InterpretableConvention.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableConvention.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableConvention.java
new file mode 100644
index 0000000..fb41d9d
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableConvention.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+
+/**
+ * Calling convention that returns results as an
+ * {@link org.apache.calcite.linq4j.Enumerable} of object arrays.
+ *
+ * <p>Unlike enumerable convention, no code generation is required.
+ */
+public enum InterpretableConvention implements Convention {
+  INSTANCE;
+
+  @Override public String toString() {
+    return getName();
+  }
+
+  public Class getInterface() {
+    return EnumerableRel.class;
+  }
+
+  public String getName() {
+    return "INTERPRETABLE";
+  }
+
+  public RelTraitDef getTraitDef() {
+    return ConventionTraitDef.INSTANCE;
+  }
+
+  public boolean subsumes(RelTrait trait) {
+    return this == trait;
+  }
+
+  public void register(RelOptPlanner planner) {}
+}
+
+// End InterpretableConvention.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/InterpretableConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableConverter.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableConverter.java
new file mode 100644
index 0000000..8b62174
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.runtime.ArrayBindable;
+
+import java.util.List;
+
+/**
+ * Relational expression that converts any relational expression input to
+ * {@link org.apache.calcite.interpreter.InterpretableConvention}, by wrapping
+ * it in an interpreter.
+ */
+public class InterpretableConverter extends ConverterImpl
+    implements ArrayBindable {
+  protected InterpretableConverter(RelOptCluster cluster, RelTraitSet traits,
+      RelNode input) {
+    super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new InterpretableConverter(getCluster(), traitSet, sole(inputs));
+  }
+
+  public Class<Object[]> getElementType() {
+    return Object[].class;
+  }
+
+  public Enumerable<Object[]> bind(DataContext dataContext) {
+    return new Interpreter(dataContext, getInput());
+  }
+}
+
+// End InterpretableConverter.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
new file mode 100644
index 0000000..340299e
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.rel.RelNode;
+
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Relational expression that can implement itself using an interpreter.
+ */
+public interface InterpretableRel extends RelNode {
+  /** Creates an interpreter node to implement this relational expression. */
+  Node implement(InterpreterImplementor implementor);
+
+  /** Context when a {@link RelNode} is being converted to an interpreter
+   * {@link Node}. */
+  class InterpreterImplementor {
+    public final Interpreter interpreter;
+    public final Map<String, Object> internalParameters =
+        Maps.newLinkedHashMap();
+    public final CalcitePrepare.SparkHandler spark;
+    public final DataContext dataContext;
+    public final Map<RelNode, List<Sink>> relSinks = Maps.newHashMap();
+
+    public InterpreterImplementor(Interpreter interpreter,
+        CalcitePrepare.SparkHandler spark,
+        DataContext dataContext) {
+      this.interpreter = interpreter;
+      this.spark = spark;
+      this.dataContext = dataContext;
+    }
+  }
+}
+
+// End InterpretableRel.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index cbcfa64..93f18ef 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -26,16 +26,15 @@ import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.util.ReflectUtil;
 import org.apache.calcite.util.ReflectiveVisitDispatcher;
 import org.apache.calcite.util.ReflectiveVisitor;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.math.BigDecimal;
-import java.util.AbstractList;
 import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Map;
@@ -53,9 +52,12 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
   private final DataContext dataContext;
   private final RelNode rootRel;
   private final Map<RelNode, List<RelNode>> relInputs = Maps.newHashMap();
+  protected final ScalarCompiler scalarCompiler;
 
   public Interpreter(DataContext dataContext, RelNode rootRel) {
     this.dataContext = dataContext;
+    this.scalarCompiler =
+        new JaninoRexCompiler(rootRel.getCluster().getRexBuilder());
     Compiler compiler = new Nodes.CoreCompiler(this);
     this.rootRel = compiler.visitRoot(rootRel);
   }
@@ -106,96 +108,116 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
   }
 
   /** Compiles an expression to an executable form. */
-  public Scalar compile(final RexNode node) {
-    if (node instanceof RexCall) {
-      final RexCall call = (RexCall) node;
-      final ImmutableList.Builder<Scalar> list = ImmutableList.builder();
-      for (RexNode operand : call.getOperands()) {
-        list.add(compile(operand));
-      }
-      final ImmutableList<Scalar> scalars = list.build();
-      return new Scalar() {
-        public Object execute(final Context context) {
-          final List<Object> args;
-          Comparable o0;
-          Comparable o1;
-          switch (call.getKind()) {
-          case LESS_THAN:
-          case LESS_THAN_OR_EQUAL:
-          case GREATER_THAN:
-          case GREATER_THAN_OR_EQUAL:
-          case EQUALS:
-          case NOT_EQUALS:
-            args = lazyArgs(context);
-            o0 = (Comparable) args.get(0);
-            if (o0 == null) {
-              return null;
-            }
-            o1 = (Comparable) args.get(1);
-            if (o1 == null) {
-              return null;
-            }
-            if (o0 instanceof BigDecimal) {
-              if (o1 instanceof Double || o1 instanceof Float) {
-                o1 = new BigDecimal(((Number) o1).doubleValue());
-              } else {
-                o1 = new BigDecimal(((Number) o1).longValue());
-              }
-            }
-            if (o1 instanceof BigDecimal) {
-              if (o0 instanceof Double || o0 instanceof Float) {
-                o0 = new BigDecimal(((Number) o0).doubleValue());
-              } else {
-                o0 = new BigDecimal(((Number) o0).longValue());
-              }
-            }
-            final int c = o0.compareTo(o1);
+  public Scalar compile(List<RexNode> nodes, List<RelNode> inputs) {
+    return scalarCompiler.compile(inputs, nodes);
+  }
+
+  /** Not used. */
+  private class FooCompiler implements ScalarCompiler {
+    public Scalar compile(List<RelNode> inputs, List<RexNode> nodes) {
+      final RexNode node = nodes.get(0);
+      if (node instanceof RexCall) {
+        final RexCall call = (RexCall) node;
+        final Scalar argScalar = compile(inputs, call.getOperands());
+        return new Scalar() {
+          final Object[] args = new Object[call.getOperands().size()];
+
+          public void execute(final Context context, Object[] results) {
+            results[0] = execute(context);
+          }
+
+          public Object execute(Context context) {
+            Comparable o0;
+            Comparable o1;
             switch (call.getKind()) {
             case LESS_THAN:
-              return c < 0;
             case LESS_THAN_OR_EQUAL:
-              return c <= 0;
             case GREATER_THAN:
-              return c > 0;
             case GREATER_THAN_OR_EQUAL:
-              return c >= 0;
             case EQUALS:
-              return c == 0;
             case NOT_EQUALS:
-              return c != 0;
+              argScalar.execute(context, args);
+              o0 = (Comparable) args[0];
+              if (o0 == null) {
+                return null;
+              }
+              o1 = (Comparable) args[1];
+              if (o1 == null) {
+                return null;
+              }
+              if (o0 instanceof BigDecimal) {
+                if (o1 instanceof Double || o1 instanceof Float) {
+                  o1 = new BigDecimal(((Number) o1).doubleValue());
+                } else {
+                  o1 = new BigDecimal(((Number) o1).longValue());
+                }
+              }
+              if (o1 instanceof BigDecimal) {
+                if (o0 instanceof Double || o0 instanceof Float) {
+                  o0 = new BigDecimal(((Number) o0).doubleValue());
+                } else {
+                  o0 = new BigDecimal(((Number) o0).longValue());
+                }
+              }
+              final int c = o0.compareTo(o1);
+              switch (call.getKind()) {
+              case LESS_THAN:
+                return c < 0;
+              case LESS_THAN_OR_EQUAL:
+                return c <= 0;
+              case GREATER_THAN:
+                return c > 0;
+              case GREATER_THAN_OR_EQUAL:
+                return c >= 0;
+              case EQUALS:
+                return c == 0;
+              case NOT_EQUALS:
+                return c != 0;
+              default:
+                throw new AssertionError("unknown expression " + call);
+              }
             default:
+              if (call.getOperator() == SqlStdOperatorTable.UPPER) {
+                argScalar.execute(context, args);
+                String s0 = (String) args[0];
+                if (s0 == null) {
+                  return null;
+                }
+                return s0.toUpperCase();
+              }
+              if (call.getOperator() == SqlStdOperatorTable.SUBSTRING) {
+                argScalar.execute(context, args);
+                String s0 = (String) args[0];
+                Number i1 = (Number) args[1];
+                Number i2 = (Number) args[2];
+                if (s0 == null || i1 == null || i2 == null) {
+                  return null;
+                }
+                return s0.substring(i1.intValue() - 1,
+                    i1.intValue() - 1 + i2.intValue());
+              }
               throw new AssertionError("unknown expression " + call);
             }
-          default:
-            throw new AssertionError("unknown expression " + call);
           }
+        };
+      }
+      return new Scalar() {
+        public void execute(Context context, Object[] results) {
+          results[0] = execute(context);
         }
 
-        private List<Object> lazyArgs(final Context context) {
-          return new AbstractList<Object>() {
-            @Override public Object get(int index) {
-              return scalars.get(index).execute(context);
-            }
-
-            @Override public int size() {
-              return scalars.size();
-            }
-          };
+        public Object execute(Context context) {
+          switch (node.getKind()) {
+          case LITERAL:
+            return ((RexLiteral) node).getValue();
+          case INPUT_REF:
+            return context.values[((RexInputRef) node).getIndex()];
+          default:
+            throw new RuntimeException("unknown expression type " + node);
+          }
         }
       };
     }
-    return new Scalar() {
-      public Object execute(Context context) {
-        switch (node.getKind()) {
-        case LITERAL:
-          return ((RexLiteral) node).getValue();
-        case INPUT_REF:
-          return context.values[((RexInputRef) node).getIndex()];
-        default:
-          throw new RuntimeException("unknown expression type " + node);
-        }
-      }
-    };
   }
 
   public Source source(RelNode rel, int ordinal) {
@@ -224,7 +246,7 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
   }
 
   public Context createContext() {
-    return new Context();
+    return new Context(dataContext);
   }
 
   public DataContext getDataContext() {
@@ -368,6 +390,12 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
     public void rewrite(RelNode r) {
     }
   }
+
+  /** Converts a list of expressions to a scalar that can compute their
+   * values. */
+  interface ScalarCompiler {
+    Scalar compile(List<RelNode> inputs, List<RexNode> nodes);
+  }
 }
 
 // End Interpreter.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Interpreters.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreters.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreters.java
new file mode 100644
index 0000000..a4b6eb0
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreters.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.runtime.ArrayBindable;
+
+/**
+ * Utilities relating to {@link org.apache.calcite.interpreter.Interpreter}
+ * and {@link org.apache.calcite.interpreter.InterpretableConvention}.
+ */
+public class Interpreters {
+  private Interpreters() {}
+
+  /** Creates a {@link org.apache.calcite.runtime.Bindable} that interprets a
+   * given relational expression. */
+  public static ArrayBindable bindable(final RelNode rel) {
+    if (rel instanceof ArrayBindable) {
+      // E.g. if rel instanceof BindableRel
+      return (ArrayBindable) rel;
+    }
+    return new ArrayBindable() {
+      public Enumerable<Object[]> bind(DataContext dataContext) {
+        return new Interpreter(dataContext, rel);
+      }
+
+      public Class<Object[]> getElementType() {
+        return Object[].class;
+      }
+    };
+  }
+}
+
+// End Interpreters.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java b/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
new file mode 100644
index 0000000..491efa9
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MemberDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.commons.compiler.CompilerFactoryFactory;
+import org.codehaus.commons.compiler.IClassBodyEvaluator;
+import org.codehaus.commons.compiler.ICompilerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Compiles a scalar expression ({@link RexNode}) to an expression that
+ * can be evaluated ({@link Scalar}) by generating a Java AST and compiling it
+ * to a class using Janino.
+ */
+public class JaninoRexCompiler implements Interpreter.ScalarCompiler {
+  private final RexBuilder rexBuilder;
+
+  public JaninoRexCompiler(RexBuilder rexBuilder) {
+    this.rexBuilder = rexBuilder;
+  }
+
+  public Scalar compile(List<RelNode> inputs, List<RexNode> nodes) {
+    final RelDataTypeFactory.FieldInfoBuilder fieldBuilder =
+        rexBuilder.getTypeFactory().builder();
+    for (RelNode input : inputs) {
+      fieldBuilder.addAll(input.getRowType().getFieldList());
+    }
+    final RelDataType inputRowType = fieldBuilder.build();
+    final RexProgramBuilder programBuilder =
+        new RexProgramBuilder(inputRowType, rexBuilder);
+    for (RexNode node : nodes) {
+      programBuilder.addProject(node, null);
+    }
+    final RexProgram program = programBuilder.getProgram();
+
+    final BlockBuilder builder = new BlockBuilder();
+    final ParameterExpression context_ =
+        Expressions.parameter(Context.class, "context");
+    final ParameterExpression outputValues_ =
+        Expressions.parameter(Object[].class, "outputValues");
+    final JavaTypeFactoryImpl javaTypeFactory =
+        new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+    // public void execute(Context, Object[] outputValues)
+    final RexToLixTranslator.InputGetter inputGetter =
+        new RexToLixTranslator.InputGetterImpl(
+            ImmutableList.of(
+                Pair.<Expression, PhysType>of(
+                    Expressions.field(context_,
+                        BuiltInMethod.CONTEXT_VALUES.field),
+                    PhysTypeImpl.of(javaTypeFactory, inputRowType,
+                        JavaRowFormat.ARRAY, false))));
+    final Function1<String, RexToLixTranslator.InputGetter> correlates =
+        new Function1<String, RexToLixTranslator.InputGetter>() {
+          public RexToLixTranslator.InputGetter apply(String a0) {
+            throw new UnsupportedOperationException();
+          }
+        };
+    final Expression root =
+        Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
+    final List<Expression> list =
+        RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
+            null, root, inputGetter, correlates);
+    for (int i = 0; i < list.size(); i++) {
+      builder.add(
+          Expressions.statement(
+              Expressions.assign(
+                  Expressions.arrayIndex(outputValues_,
+                      Expressions.constant(i)),
+                  list.get(i))));
+    }
+    return baz(context_, outputValues_, builder.toBlock());
+  }
+
+  /** Given a method that implements {@link Scalar#execute(Context, Object[])},
+   * adds a bridge method that implements {@link Scalar#execute(Context)}, and
+   * compiles. */
+  static Scalar baz(ParameterExpression context_,
+      ParameterExpression outputValues_, BlockStatement block) {
+    final List<MemberDeclaration> declarations = Lists.newArrayList();
+
+    // public void execute(Context, Object[] outputValues)
+    declarations.add(
+        Expressions.methodDecl(Modifier.PUBLIC, void.class,
+            BuiltInMethod.SCALAR_EXECUTE2.method.getName(),
+            ImmutableList.of(context_, outputValues_), block));
+
+    // public Object execute(Context)
+    final BlockBuilder builder = new BlockBuilder();
+    final Expression values_ = builder.append("values",
+        Expressions.newArrayBounds(Object.class, 1,
+            Expressions.constant(1)));
+    builder.add(
+        Expressions.statement(
+            Expressions.call(
+                Expressions.parameter(Scalar.class, "this"),
+                BuiltInMethod.SCALAR_EXECUTE2.method, context_, values_)));
+    builder.add(
+        Expressions.return_(null,
+            Expressions.arrayIndex(values_, Expressions.constant(0))));
+    declarations.add(
+        Expressions.methodDecl(Modifier.PUBLIC, Object.class,
+            BuiltInMethod.SCALAR_EXECUTE1.method.getName(),
+            ImmutableList.of(context_), builder.toBlock()));
+
+    final ClassDeclaration classDeclaration =
+        Expressions.classDecl(Modifier.PUBLIC, "Buzz", null,
+            ImmutableList.<Type>of(Scalar.class), declarations);
+    String s = Expressions.toString(declarations, "\n", false);
+    if (CalcitePrepareImpl.DEBUG) {
+      Util.debugCode(System.out, s);
+    }
+    try {
+      return getScalar(classDeclaration, s);
+    } catch (CompileException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static Scalar getScalar(ClassDeclaration expr, String s)
+      throws CompileException, IOException {
+    ICompilerFactory compilerFactory;
+    try {
+      compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory();
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Unable to instantiate java compiler", e);
+    }
+    IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator();
+    cbe.setClassName(expr.name);
+    cbe.setImplementedInterfaces(new Class[]{Scalar.class});
+    cbe.setParentClassLoader(JaninoRexCompiler.class.getClassLoader());
+    if (CalcitePrepareImpl.DEBUG) {
+      // Add line numbers to the generated janino class
+      cbe.setDebuggingInformation(true, true, true);
+    }
+    return (Scalar) cbe.createInstance(new StringReader(s));
+  }
+}
+
+// End JaninoRexCompiler.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
new file mode 100644
index 0000000..498c8a3
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.rel.core.Join;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Interpreter node that implements a
+ * {@link org.apache.calcite.rel.core.Join}.
+ */
+public class JoinNode implements Node {
+  private final Source leftSource;
+  private final Source rightSource;
+  private final Sink sink;
+  private final Join rel;
+  private final Scalar condition;
+  private final Context context;
+
+  public JoinNode(Interpreter interpreter, Join rel) {
+    this.leftSource = interpreter.source(rel, 0);
+    this.rightSource = interpreter.source(rel, 1);
+    this.sink = interpreter.sink(rel);
+    this.condition = interpreter.compile(ImmutableList.of(rel.getCondition()),
+        rel.getInputs());
+    this.rel = rel;
+    this.context = interpreter.createContext();
+
+  }
+
+  public void run() throws InterruptedException {
+    List<Row> rightList = null;
+    final int leftCount = rel.getLeft().getRowType().getFieldCount();
+    final int rightCount = rel.getRight().getRowType().getFieldCount();
+    context.values = new Object[rel.getRowType().getFieldCount()];
+    Row left;
+    Row right;
+    while ((left = leftSource.receive()) != null) {
+      System.arraycopy(left.getValues(), 0, context.values, 0, leftCount);
+      if (rightList == null) {
+        rightList = Lists.newArrayList();
+        while ((right = rightSource.receive()) != null) {
+          rightList.add(right);
+        }
+      }
+      for (Row right2 : rightList) {
+        System.arraycopy(right2.getValues(), 0, context.values, leftCount,
+            rightCount);
+        final Boolean execute = (Boolean) condition.execute(context);
+        if (execute != null && execute) {
+          sink.send(Row.asCopy(context.values));
+        }
+      }
+    }
+  }
+}
+
+// End JoinNode.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
index b09e7c3..1e067bb 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
@@ -24,10 +24,13 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Calc;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rel.rules.FilterTableRule;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.FilterableTable;
@@ -149,16 +152,29 @@ public class Nodes {
     }
 
     public void visit(TableScan scan) {
-      node = new ScanNode(interpreter, scan, ImmutableList.<RexNode>of(), null);
+      node = new TableScanNode(interpreter, scan, ImmutableList.<RexNode>of(),
+          null);
     }
 
     public void visit(FilterScan scan) {
-      node = new ScanNode(interpreter, scan, scan.filters, scan.projects);
+      node = new TableScanNode(interpreter, scan, scan.filters, scan.projects);
     }
 
     public void visit(Sort sort) {
       node = new SortNode(interpreter, sort);
     }
+
+    public void visit(Union union) {
+      node = new UnionNode(interpreter, union);
+    }
+
+    public void visit(Join join) {
+      node = new JoinNode(interpreter, join);
+    }
+
+    public void visit(Window window) {
+      node = new WindowNode(interpreter, window);
+    }
   }
 
   /** Table scan that applies filters and optionally projects. Only used in an

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/NoneToBindableConverterRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/NoneToBindableConverterRule.java b/core/src/main/java/org/apache/calcite/interpreter/NoneToBindableConverterRule.java
new file mode 100644
index 0000000..ab41cac
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/NoneToBindableConverterRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+
+/**
+ * Rule to convert a relational expression from
+ * {@link org.apache.calcite.plan.Convention#NONE}
+ * to {@link org.apache.calcite.interpreter.BindableConvention}.
+ */
+public class NoneToBindableConverterRule extends ConverterRule {
+  public static final ConverterRule INSTANCE =
+      new NoneToBindableConverterRule();
+
+  private NoneToBindableConverterRule() {
+    super(RelNode.class, Convention.NONE, BindableConvention.INSTANCE,
+        "NoneToBindableConverterRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+    return new InterpretableConverter(rel.getCluster(), newTraitSet, rel);
+  }
+}
+
+// End NoneToBindableConverterRule.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
index bca464b..1737b8b 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
@@ -17,28 +17,20 @@
 package org.apache.calcite.interpreter;
 
 import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexNode;
-
-import com.google.common.collect.ImmutableList;
 
 /**
  * Interpreter node that implements a
  * {@link org.apache.calcite.rel.logical.LogicalFilter}.
  */
-public class ProjectNode implements Node {
-  private final ImmutableList<Scalar> projects;
-  private final Source source;
-  private final Sink sink;
+public class ProjectNode extends AbstractSingleNode<Project> {
+  private final Scalar scalar;
   private final Context context;
+  private final int projectCount;
 
   public ProjectNode(Interpreter interpreter, Project rel) {
-    ImmutableList.Builder<Scalar> builder = ImmutableList.builder();
-    for (RexNode node : rel.getProjects()) {
-      builder.add(interpreter.compile(node));
-    }
-    this.projects = builder.build();
-    this.source = interpreter.source(rel, 0);
-    this.sink = interpreter.sink(rel);
+    super(interpreter, rel);
+    this.projectCount = rel.getProjects().size();
+    this.scalar = interpreter.compile(rel.getProjects(), rel.getInputs());
     this.context = interpreter.createContext();
   }
 
@@ -46,11 +38,8 @@ public class ProjectNode implements Node {
     Row row;
     while ((row = source.receive()) != null) {
       context.values = row.getValues();
-      Object[] values = new Object[projects.size()];
-      for (int i = 0; i < projects.size(); i++) {
-        Scalar scalar = projects.get(i);
-        values[i] = scalar.execute(context);
-      }
+      Object[] values = new Object[projectCount];
+      scalar.execute(context, values);
       sink.send(new Row(values));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Scalar.java b/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
index 2f7e923..795e9d4 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
@@ -21,6 +21,7 @@ package org.apache.calcite.interpreter;
  */
 public interface Scalar {
   Object execute(Context context);
+  void execute(Context context, Object[] results);
 }
 
 // End Scalar.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/ScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/ScanNode.java
deleted file mode 100644
index 2688353..0000000
--- a/core/src/main/java/org/apache/calcite/interpreter/ScanNode.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.interpreter;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.runtime.Enumerables;
-import org.apache.calcite.schema.FilterableTable;
-import org.apache.calcite.schema.ProjectableFilterableTable;
-import org.apache.calcite.schema.QueryableTable;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Schemas;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Util;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.util.List;
-
-/**
- * Interpreter node that implements a
- * {@link org.apache.calcite.rel.core.TableScan}.
- */
-public class ScanNode implements Node {
-  private final Sink sink;
-  private final TableScan rel;
-  private final ImmutableList<RexNode> filters;
-  private final DataContext root;
-  private final int[] projects;
-
-  public ScanNode(Interpreter interpreter, TableScan rel,
-      ImmutableList<RexNode> filters, ImmutableIntList projects) {
-    this.rel = rel;
-    this.filters = Preconditions.checkNotNull(filters);
-    this.projects = projects == null ? null : projects.toIntArray();
-    this.sink = interpreter.sink(rel);
-    this.root = interpreter.getDataContext();
-  }
-
-  public void run() throws InterruptedException {
-    final Enumerable<Row> iterable = iterable();
-    final Enumerator<Row> enumerator = iterable.enumerator();
-    while (enumerator.moveNext()) {
-      sink.send(enumerator.current());
-    }
-    enumerator.close();
-    sink.end();
-  }
-
-  private Enumerable<Row> iterable() {
-    final RelOptTable table = rel.getTable();
-    final ProjectableFilterableTable pfTable =
-        table.unwrap(ProjectableFilterableTable.class);
-    if (pfTable != null) {
-      final List<RexNode> filters1 = Lists.newArrayList(filters);
-      final int[] projects1 =
-          isIdentity(projects, rel.getRowType().getFieldCount())
-              ? null : projects;
-      final Enumerable<Object[]> enumerator =
-          pfTable.scan(root, filters1, projects1);
-      assert filters1.isEmpty()
-          : "table could not handle a filter it earlier said it could";
-      return Enumerables.toRow(enumerator);
-    }
-    if (projects != null) {
-      throw new AssertionError("have projects, but table cannot handle them");
-    }
-    final FilterableTable filterableTable =
-        table.unwrap(FilterableTable.class);
-    if (filterableTable != null) {
-      final List<RexNode> filters1 = Lists.newArrayList(filters);
-      final Enumerable<Object[]> enumerator =
-          filterableTable.scan(root, filters1);
-      assert filters1.isEmpty()
-          : "table could not handle a filter it earlier said it could";
-      return Enumerables.toRow(enumerator);
-    }
-    if (!filters.isEmpty()) {
-      throw new AssertionError("have filters, but table cannot handle them");
-    }
-    //noinspection unchecked
-    Enumerable<Row> iterable = table.unwrap(Enumerable.class);
-    if (iterable != null) {
-      return iterable;
-    }
-    final QueryableTable queryableTable = table.unwrap(QueryableTable.class);
-    if (queryableTable != null) {
-      final Type elementType = queryableTable.getElementType();
-      SchemaPlus schema = root.getRootSchema();
-      for (String name : Util.skipLast(table.getQualifiedName())) {
-        schema = schema.getSubSchema(name);
-      }
-      if (elementType instanceof Class) {
-        //noinspection unchecked
-        final Queryable<Object> queryable = Schemas.queryable(root,
-            (Class) elementType, table.getQualifiedName());
-        ImmutableList.Builder<Field> fieldBuilder = ImmutableList.builder();
-        Class type = (Class) elementType;
-        for (Field field : type.getFields()) {
-          if (Modifier.isPublic(field.getModifiers())
-              && !Modifier.isStatic(field.getModifiers())) {
-            fieldBuilder.add(field);
-          }
-        }
-        final List<Field> fields = fieldBuilder.build();
-        return queryable.select(
-            new Function1<Object, Row>() {
-              public Row apply(Object o) {
-                final Object[] values = new Object[fields.size()];
-                for (int i = 0; i < fields.size(); i++) {
-                  Field field = fields.get(i);
-                  try {
-                    values[i] = field.get(o);
-                  } catch (IllegalAccessException e) {
-                    throw new RuntimeException(e);
-                  }
-                }
-                return new Row(values);
-              }
-            });
-      } else {
-        return Schemas.queryable(root, Row.class,
-            table.getQualifiedName());
-      }
-    }
-    final ScannableTable scannableTable =
-        table.unwrap(ScannableTable.class);
-    if (scannableTable != null) {
-      return Enumerables.toRow(scannableTable.scan(root));
-    }
-    throw new AssertionError("cannot convert table " + table + " to iterable");
-  }
-
-  private static boolean isIdentity(int[] is, int count) {
-    if (is.length != count) {
-      return false;
-    }
-    for (int i = 0; i < is.length; i++) {
-      if (is[i] != i) {
-        return false;
-      }
-    }
-    return true;
-  }
-}
-
-// End ScanNode.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
index e5fb91d..1e5f07b 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
@@ -34,15 +34,9 @@ import java.util.List;
  * Interpreter node that implements a
  * {@link org.apache.calcite.rel.core.Sort}.
  */
-public class SortNode implements Node {
-  private final Source source;
-  private final Sink sink;
-  private final Sort rel;
-
+public class SortNode extends AbstractSingleNode<Sort> {
   public SortNode(Interpreter interpreter, Sort rel) {
-    this.rel = rel;
-    this.source = interpreter.source(rel, 0);
-    this.sink = interpreter.sink(rel);
+    super(interpreter, rel);
   }
 
   public void run() throws InterruptedException {
@@ -103,30 +97,55 @@ public class SortNode implements Node {
             }));
   }
 
+  private static int compare(Comparable c1, Comparable c2,
+      int nullComparison) {
+    if (c1 == c2) {
+      return 0;
+    } else if (c1 == null) {
+      return nullComparison;
+    } else if (c2 == null) {
+      return -nullComparison;
+    } else {
+      //noinspection unchecked
+      return c1.compareTo(c2);
+    }
+  }
+
   private Comparator<Row> comparator(final RelFieldCollation fieldCollation) {
+    final int nullComparison = getNullComparison(fieldCollation.nullDirection);
     switch (fieldCollation.direction) {
     case ASCENDING:
       return new Comparator<Row>() {
-        final int x = fieldCollation.getFieldIndex();
         public int compare(Row o1, Row o2) {
+          final int x = fieldCollation.getFieldIndex();
           final Comparable c1 = (Comparable) o1.getValues()[x];
           final Comparable c2 = (Comparable) o2.getValues()[x];
-          //noinspection unchecked
-          return c1.compareTo(c2);
+          return SortNode.compare(c1, c2, nullComparison);
         }
       };
     default:
       return new Comparator<Row>() {
-        final int x = fieldCollation.getFieldIndex();
         public int compare(Row o1, Row o2) {
+          final int x = fieldCollation.getFieldIndex();
           final Comparable c1 = (Comparable) o1.getValues()[x];
           final Comparable c2 = (Comparable) o2.getValues()[x];
-          //noinspection unchecked
-          return c2.compareTo(c1);
+          return SortNode.compare(c2, c1, -nullComparison);
         }
       };
     }
   }
+
+  private int getNullComparison(RelFieldCollation.NullDirection nullDirection) {
+    switch (nullDirection) {
+    case FIRST:
+      return -1;
+    case UNSPECIFIED:
+    case LAST:
+      return 1;
+    default:
+      throw new AssertionError(nullDirection);
+    }
+  }
 }
 
 // End SortNode.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
new file mode 100644
index 0000000..6f2c90f
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.runtime.Enumerables;
+import org.apache.calcite.schema.FilterableTable;
+import org.apache.calcite.schema.ProjectableFilterableTable;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Interpreter node that implements a
+ * {@link org.apache.calcite.rel.core.TableScan}.
+ */
+public class TableScanNode implements Node {
+  private final Sink sink;
+  private final TableScan rel;
+  private final ImmutableList<RexNode> filters;
+  private final DataContext root;
+  private final int[] projects;
+
+  TableScanNode(Interpreter interpreter, TableScan rel,
+      ImmutableList<RexNode> filters, ImmutableIntList projects) {
+    this.rel = rel;
+    this.filters = Preconditions.checkNotNull(filters);
+    this.projects = projects == null ? null : projects.toIntArray();
+    this.sink = interpreter.sink(rel);
+    this.root = interpreter.getDataContext();
+  }
+
+  public void run() throws InterruptedException {
+    final Enumerable<Row> iterable = iterable();
+    final Enumerator<Row> enumerator = iterable.enumerator();
+    while (enumerator.moveNext()) {
+      sink.send(enumerator.current());
+    }
+    enumerator.close();
+    sink.end();
+  }
+
+  private Enumerable<Row> iterable() {
+    final RelOptTable table = rel.getTable();
+    final ProjectableFilterableTable pfTable =
+        table.unwrap(ProjectableFilterableTable.class);
+    if (pfTable != null) {
+      final List<RexNode> filters1 = Lists.newArrayList(filters);
+      final int[] projects1 =
+          projects == null
+              || isIdentity(projects, rel.getRowType().getFieldCount())
+              ? null : projects;
+      final Enumerable<Object[]> enumerator =
+          pfTable.scan(root, filters1, projects1);
+      assert filters1.isEmpty()
+          : "table could not handle a filter it earlier said it could";
+      return Enumerables.toRow(enumerator);
+    }
+    if (projects != null) {
+      throw new AssertionError("have projects, but table cannot handle them");
+    }
+    final FilterableTable filterableTable =
+        table.unwrap(FilterableTable.class);
+    if (filterableTable != null) {
+      final List<RexNode> filters1 = Lists.newArrayList(filters);
+      final Enumerable<Object[]> enumerator =
+          filterableTable.scan(root, filters1);
+      assert filters1.isEmpty()
+          : "table could not handle a filter it earlier said it could";
+      return Enumerables.toRow(enumerator);
+    }
+    if (!filters.isEmpty()) {
+      throw new AssertionError("have filters, but table cannot handle them");
+    }
+    final ScannableTable scannableTable =
+        table.unwrap(ScannableTable.class);
+    if (scannableTable != null) {
+      return Enumerables.toRow(scannableTable.scan(root));
+    }
+    //noinspection unchecked
+    Enumerable<Row> iterable = table.unwrap(Enumerable.class);
+    if (iterable != null) {
+      return iterable;
+    }
+    final QueryableTable queryableTable = table.unwrap(QueryableTable.class);
+    if (queryableTable != null) {
+      final Type elementType = queryableTable.getElementType();
+      SchemaPlus schema = root.getRootSchema();
+      for (String name : Util.skipLast(table.getQualifiedName())) {
+        schema = schema.getSubSchema(name);
+      }
+      if (elementType instanceof Class) {
+        //noinspection unchecked
+        final Queryable<Object> queryable = Schemas.queryable(root,
+            (Class) elementType, table.getQualifiedName());
+        ImmutableList.Builder<Field> fieldBuilder = ImmutableList.builder();
+        Class type = (Class) elementType;
+        for (Field field : type.getFields()) {
+          if (Modifier.isPublic(field.getModifiers())
+              && !Modifier.isStatic(field.getModifiers())) {
+            fieldBuilder.add(field);
+          }
+        }
+        final List<Field> fields = fieldBuilder.build();
+        return queryable.select(
+            new Function1<Object, Row>() {
+              public Row apply(Object o) {
+                final Object[] values = new Object[fields.size()];
+                for (int i = 0; i < fields.size(); i++) {
+                  Field field = fields.get(i);
+                  try {
+                    values[i] = field.get(o);
+                  } catch (IllegalAccessException e) {
+                    throw new RuntimeException(e);
+                  }
+                }
+                return new Row(values);
+              }
+            });
+      } else {
+        return Schemas.queryable(root, Row.class,
+            table.getQualifiedName());
+      }
+    }
+    throw new AssertionError("cannot convert table " + table + " to iterable");
+  }
+
+  private static boolean isIdentity(int[] is, int count) {
+    if (is.length != count) {
+      return false;
+    }
+    for (int i = 0; i < is.length; i++) {
+      if (is[i] != i) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
+
+// End TableScanNode.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
new file mode 100644
index 0000000..701dbc7
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.rel.core.Union;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+/**
+ * Interpreter node that implements a
+ * {@link org.apache.calcite.rel.core.Union}.
+ */
+public class UnionNode implements Node {
+  private final ImmutableList<Source> sources;
+  private final Sink sink;
+  private final Union rel;
+
+  public UnionNode(Interpreter interpreter, Union rel) {
+    ImmutableList.Builder<Source> builder = ImmutableList.builder();
+    for (int i = 0; i < rel.getInputs().size(); i++) {
+      builder.add(interpreter.source(rel, i));
+    }
+    this.sources = builder.build();
+    this.sink = interpreter.sink(rel);
+    this.rel = rel;
+  }
+
+  public void run() throws InterruptedException {
+    final Set<Row> rows = rel.all ? null : Sets.<Row>newHashSet();
+    for (Source source : sources) {
+      Row row;
+      while ((row = source.receive()) != null) {
+        if (rows == null || rows.add(row)) {
+          sink.send(row);
+        }
+      }
+    }
+  }
+}
+
+// End UnionNode.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
index fe68790..ff8c950 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
@@ -16,8 +16,13 @@
  */
 package org.apache.calcite.interpreter;
 
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Values;
 import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 
@@ -27,23 +32,38 @@ import java.util.List;
  */
 public class ValuesNode implements Node {
   private final Sink sink;
-  private final Values rel;
   private final int fieldCount;
+  private final ImmutableList<Row> rows;
 
   public ValuesNode(Interpreter interpreter, Values rel) {
-    this.rel = rel;
     this.sink = interpreter.sink(rel);
     this.fieldCount = rel.getRowType().getFieldCount();
+    this.rows = createRows(interpreter, rel.getTuples());
+  }
+
+  private ImmutableList<Row> createRows(Interpreter interpreter,
+      ImmutableList<ImmutableList<RexLiteral>> tuples) {
+    final List<RexNode> nodes = Lists.newArrayList();
+    for (ImmutableList<RexLiteral> tuple : tuples) {
+      nodes.addAll(tuple);
+    }
+    final Scalar scalar =
+        interpreter.compile(nodes, ImmutableList.<RelNode>of());
+    final Object[] values = new Object[nodes.size()];
+    final Context context = interpreter.createContext();
+    scalar.execute(context, values);
+    final ImmutableList.Builder<Row> rows = ImmutableList.builder();
+    Object[] subValues = new Object[fieldCount];
+    for (int i = 0; i < values.length; i += fieldCount) {
+      System.arraycopy(values, i, subValues, 0, fieldCount);
+      rows.add(Row.asCopy(subValues));
+    }
+    return rows.build();
   }
 
   public void run() throws InterruptedException {
-    for (List<RexLiteral> list : rel.getTuples()) {
-      final Object[] values = new Object[fieldCount];
-      for (int i = 0; i < list.size(); i++) {
-        RexLiteral rexLiteral = list.get(i);
-        values[i] = rexLiteral.getValue();
-      }
-      sink.send(new Row(values));
+    for (Row row : rows) {
+      sink.send(row);
     }
     sink.end();
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
new file mode 100644
index 0000000..a03608c
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.rel.core.Window;
+
+/**
+ * Interpreter node that implements a
+ * {@link org.apache.calcite.rel.core.Window}.
+ */
+public class WindowNode extends AbstractSingleNode<Window> {
+  WindowNode(Interpreter interpreter, Window rel) {
+    super(interpreter, rel);
+  }
+
+  public void run() throws InterruptedException {
+    Row row;
+    while ((row = source.receive()) != null) {
+      sink.send(row);
+    }
+    sink.end();
+  }
+}
+
+// End WindowNode.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java b/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java
index 00e61b1..acbdfe9 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java
@@ -33,6 +33,7 @@ import org.apache.calcite.prepare.CalcitePrepareImpl;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.runtime.ArrayBindable;
 import org.apache.calcite.runtime.Bindable;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.validate.SqlValidator;
@@ -104,7 +105,7 @@ public interface CalcitePrepare {
 
     boolean enabled();
 
-    Bindable compile(ClassDeclaration expr, String s);
+    ArrayBindable compile(ClassDeclaration expr, String s);
 
     Object sparkContext();
 
@@ -177,7 +178,7 @@ public interface CalcitePrepare {
         return false;
       }
 
-      public Bindable compile(ClassDeclaration expr, String s) {
+      public ArrayBindable compile(ClassDeclaration expr, String s) {
         throw new UnsupportedOperationException();
       }