You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/01/22 01:33:39 UTC
[2/3] incubator-calcite git commit: [CALCITE-544] Implement Union in
Interpreter;
[CALCITE-562] Implement inner JOIN in interpreter and improve handling of
scalar expressions
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
new file mode 100644
index 0000000..cfe9b37
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.enumerable.AggImplementor;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Utilities pertaining to {@link BindableRel} and {@link BindableConvention}.
+ */
+public class Bindables {
+ private Bindables() {}
+
+ public static final RelOptRule BINDABLE_TABLE_RULE =
+ new BindableTableScanRule();
+
+ public static final RelOptRule BINDABLE_FILTER_RULE =
+ new BindableFilterRule();
+
+ public static final RelOptRule BINDABLE_PROJECT_RULE =
+ new BindableProjectRule();
+
+ public static final RelOptRule BINDABLE_SORT_RULE =
+ new BindableSortRule();
+
+ public static final RelOptRule BINDABLE_JOIN_RULE =
+ new BindableJoinRule();
+
+ public static final RelOptRule BINDABLE_UNION_RULE =
+ new BindableUnionRule();
+
+ public static final RelOptRule BINDABLE_VALUES_RULE =
+ new BindableValuesRule();
+
+ public static final RelOptRule BINDABLE_AGGREGATE_RULE =
+ new BindableAggregateRule();
+
+ public static final RelOptRule BINDABLE_WINDOW_RULE =
+ new BindableWindowRule();
+
+ /** All rules that convert logical relational expression to bindable. */
+ public static final ImmutableList<RelOptRule> RULES =
+ ImmutableList.of(
+ NoneToBindableConverterRule.INSTANCE,
+ BINDABLE_TABLE_RULE,
+ BINDABLE_FILTER_RULE,
+ BINDABLE_PROJECT_RULE,
+ BINDABLE_SORT_RULE,
+ BINDABLE_JOIN_RULE,
+ BINDABLE_UNION_RULE,
+ BINDABLE_VALUES_RULE,
+ BINDABLE_AGGREGATE_RULE,
+ BINDABLE_WINDOW_RULE);
+
+ /** Helper method that converts a bindable relational expression into a
+ * record iterator.
+ *
+ * <p>Any bindable can be compiled; if its input is also bindable, it becomes
+ * part of the same compilation unit.
+ */
+ private static Enumerable<Object[]> help(DataContext dataContext,
+ BindableRel rel) {
+ return new Interpreter(dataContext, rel);
+ }
+
+ /** Rule that converts a {@link ScannableTable} to bindable convention. */
+ private static class BindableTableScanRule extends RelOptRule {
+ public BindableTableScanRule() {
+ super(operand(TableScan.class, none()));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final TableScan scan = call.rel(0);
+ call.transformTo(
+ new BindableTableScan(scan.getCluster(),
+ scan.getTraitSet().replace(BindableConvention.INSTANCE),
+ scan.getTable()));
+ }
+ }
+
+ /** Scan of a table that implements {@link ScannableTable} and therefore can
+ * be converted into an {@link Enumerable}. */
+ private static class BindableTableScan
+ extends TableScan implements BindableRel {
+ BindableTableScan(RelOptCluster cluster, RelTraitSet traits,
+ RelOptTable table) {
+ super(cluster, traits, table);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return table.unwrap(ScannableTable.class).scan(dataContext);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ throw new UnsupportedOperationException(); // TODO:
+ }
+ }
+
+ /** Rule that converts a {@link Filter} to bindable convention. */
+ private static class BindableFilterRule extends ConverterRule {
+ private BindableFilterRule() {
+ super(LogicalFilter.class, RelOptUtil.FILTER_PREDICATE, Convention.NONE,
+ BindableConvention.INSTANCE, "BindableFilterRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalFilter filter = (LogicalFilter) rel;
+ return new BindableFilter(rel.getCluster(),
+ rel.getTraitSet().replace(BindableConvention.INSTANCE),
+ convert(filter.getInput(),
+ filter.getInput().getTraitSet()
+ .replace(BindableConvention.INSTANCE)),
+ filter.getCondition());
+ }
+ }
+
+ /** Implementation of {@link org.apache.calcite.rel.core.Filter}
+ * in bindable convention. */
+ public static class BindableFilter extends Filter implements BindableRel {
+ public BindableFilter(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode child, RexNode condition) {
+ super(cluster, traitSet, child, condition);
+ assert getConvention() instanceof BindableConvention;
+ }
+
+ public BindableFilter copy(RelTraitSet traitSet, RelNode input,
+ RexNode condition) {
+ return new BindableFilter(getCluster(), traitSet, input, condition);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return help(dataContext, this);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ return new FilterNode(implementor.interpreter, this);
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
+ * to a {@link BindableProject}.
+ */
+ private static class BindableProjectRule extends ConverterRule {
+ BindableProjectRule() {
+ super(LogicalProject.class, RelOptUtil.PROJECT_PREDICATE, Convention.NONE,
+ BindableConvention.INSTANCE, "BindableProjectRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalProject project = (LogicalProject) rel;
+ return new BindableProject(rel.getCluster(),
+ rel.getTraitSet().replace(BindableConvention.INSTANCE),
+ convert(project.getInput(),
+ project.getInput().getTraitSet()
+ .replace(BindableConvention.INSTANCE)),
+ project.getProjects(),
+ project.getRowType(),
+ Project.Flags.BOXED);
+ }
+ }
+
+ /** Implementation of {@link org.apache.calcite.rel.core.Project} in
+ * bindable calling convention. */
+ public static class BindableProject extends Project implements BindableRel {
+ public BindableProject(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode child, List<? extends RexNode> exps, RelDataType rowType,
+ int flags) {
+ super(cluster, traitSet, child, exps, rowType, flags);
+ assert getConvention() instanceof BindableConvention;
+ }
+
+ public BindableProject copy(RelTraitSet traitSet, RelNode input,
+ List<RexNode> exps, RelDataType rowType) {
+ return new BindableProject(getCluster(), traitSet, input, exps, rowType,
+ flags);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return help(dataContext, this);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ return new ProjectNode(implementor.interpreter, this);
+ }
+ }
+
+ /**
+ * Rule to convert an {@link org.apache.calcite.rel.core.Sort} to a
+ * {@link org.apache.calcite.interpreter.Bindables.BindableSort}.
+ */
+ private static class BindableSortRule extends ConverterRule {
+ BindableSortRule() {
+ super(Sort.class, Convention.NONE, BindableConvention.INSTANCE,
+ "BindableSortRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final Sort sort = (Sort) rel;
+ final RelTraitSet traitSet =
+ sort.getTraitSet().replace(BindableConvention.INSTANCE);
+ final RelNode input = sort.getInput();
+ return new BindableSort(rel.getCluster(), traitSet,
+ convert(input,
+ input.getTraitSet().replace(BindableConvention.INSTANCE)),
+ sort.getCollation(), sort.offset, sort.fetch);
+ }
+ }
+
+ /** Implementation of {@link org.apache.calcite.rel.core.Sort}
+ * bindable calling convention. */
+ public static class BindableSort extends Sort implements BindableRel {
+ public BindableSort(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child, collation, offset, fetch);
+ assert getConvention() instanceof BindableConvention;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public BindableSort copy(RelTraitSet traitSet, RelNode newInput,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new BindableSort(getCluster(), traitSet, newInput, newCollation,
+ offset, fetch);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return help(dataContext, this);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ return new SortNode(implementor.interpreter, this);
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalJoin}
+ * to a {@link BindableJoin}.
+ */
+ private static class BindableJoinRule extends ConverterRule {
+ BindableJoinRule() {
+ super(LogicalJoin.class, Convention.NONE, BindableConvention.INSTANCE,
+ "BindableJoinRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalJoin join = (LogicalJoin) rel;
+ final BindableConvention out = BindableConvention.INSTANCE;
+ final RelTraitSet traitSet = join.getTraitSet().replace(out);
+ return new BindableJoin(rel.getCluster(), traitSet,
+ convert(join.getLeft(),
+ join.getLeft().getTraitSet()
+ .replace(BindableConvention.INSTANCE)),
+ convert(join.getRight(),
+ join.getRight().getTraitSet()
+ .replace(BindableConvention.INSTANCE)),
+ join.getCondition(), join.getJoinType(), join.getVariablesStopped());
+ }
+ }
+
+ /** Implementation of {@link org.apache.calcite.rel.core.Join} in
+ * bindable calling convention. */
+ public static class BindableJoin extends Join implements BindableRel {
+ protected BindableJoin(RelOptCluster cluster, RelTraitSet traits,
+ RelNode left, RelNode right, RexNode condition, JoinRelType joinType,
+ Set<String> variablesStopped) {
+ super(cluster, traits, left, right, condition, joinType,
+ variablesStopped);
+ }
+
+ public BindableJoin copy(RelTraitSet traitSet, RexNode conditionExpr,
+ RelNode left, RelNode right, JoinRelType joinType,
+ boolean semiJoinDone) {
+ return new BindableJoin(getCluster(), traitSet, left, right,
+ conditionExpr, joinType, variablesStopped);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return help(dataContext, this);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ return new JoinNode(implementor.interpreter, this);
+ }
+ }
+
+ /**
+ * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalUnion}
+ * to a {@link BindableUnion}.
+ */
+ private static class BindableUnionRule extends ConverterRule {
+ BindableUnionRule() {
+ super(LogicalUnion.class, Convention.NONE, BindableConvention.INSTANCE,
+ "BindableUnionRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalUnion union = (LogicalUnion) rel;
+ final BindableConvention out = BindableConvention.INSTANCE;
+ final RelTraitSet traitSet = union.getTraitSet().replace(out);
+ return new BindableUnion(rel.getCluster(), traitSet,
+ convertList(union.getInputs(), out), union.all);
+ }
+ }
+
+ /** Implementation of {@link org.apache.calcite.rel.core.Union} in
+ * bindable calling convention. */
+ public static class BindableUnion extends Union implements BindableRel {
+ public BindableUnion(RelOptCluster cluster, RelTraitSet traitSet,
+ List<RelNode> inputs, boolean all) {
+ super(cluster, traitSet, inputs, all);
+ }
+
+ public BindableUnion copy(RelTraitSet traitSet, List<RelNode> inputs,
+ boolean all) {
+ return new BindableUnion(getCluster(), traitSet, inputs, all);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return help(dataContext, this);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ return new UnionNode(implementor.interpreter, this);
+ }
+ }
+
+ /** Implementation of {@link org.apache.calcite.rel.core.Values}
+ * in bindable calling convention. */
+ public static class BindableValues extends Values implements BindableRel {
+ BindableValues(RelOptCluster cluster, RelDataType rowType,
+ ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traitSet) {
+ super(cluster, rowType, tuples, traitSet);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return new BindableValues(getCluster(), rowType, tuples, traitSet);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return help(dataContext, this);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ return new ValuesNode(implementor.interpreter, this);
+ }
+ }
+
+ /** Rule that converts a {@link Values} to bindable convention. */
+ private static class BindableValuesRule extends ConverterRule {
+ BindableValuesRule() {
+ super(LogicalValues.class, Convention.NONE, BindableConvention.INSTANCE,
+ "BindableValuesRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ LogicalValues values = (LogicalValues) rel;
+ return new BindableValues(values.getCluster(), values.getRowType(),
+ values.getTuples(),
+ values.getTraitSet().replace(BindableConvention.INSTANCE));
+ }
+ }
+
+ /** Implementation of {@link org.apache.calcite.rel.core.Aggregate}
+ * in bindable calling convention. */
+ public static class BindableAggregate extends Aggregate
+ implements BindableRel {
+ public BindableAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode child,
+ boolean indicator,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls)
+ throws InvalidRelException {
+ super(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls);
+ assert getConvention() instanceof BindableConvention;
+
+ for (AggregateCall aggCall : aggCalls) {
+ if (aggCall.isDistinct()) {
+ throw new InvalidRelException(
+ "distinct aggregation not supported");
+ }
+ AggImplementor implementor2 =
+ RexImpTable.INSTANCE.get(aggCall.getAggregation(), false);
+ if (implementor2 == null) {
+ throw new InvalidRelException(
+ "aggregation " + aggCall.getAggregation() + " not supported");
+ }
+ }
+ }
+
+ @Override public BindableAggregate copy(RelTraitSet traitSet, RelNode input,
+ boolean indicator, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ try {
+ return new BindableAggregate(getCluster(), traitSet, input, indicator,
+ groupSet, groupSets, aggCalls);
+ } catch (InvalidRelException e) {
+ // Semantic error not possible. Must be a bug. Convert to
+ // internal error.
+ throw new AssertionError(e);
+ }
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return help(dataContext, this);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ return new AggregateNode(implementor.interpreter, this);
+ }
+ }
+
+ /** Rule that converts an {@link Aggregate} to bindable convention. */
+ private static class BindableAggregateRule extends ConverterRule {
+ BindableAggregateRule() {
+ super(LogicalAggregate.class, Convention.NONE,
+ BindableConvention.INSTANCE, "BindableAggregateRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalAggregate agg = (LogicalAggregate) rel;
+ final RelTraitSet traitSet =
+ agg.getTraitSet().replace(BindableConvention.INSTANCE);
+ try {
+ return new BindableAggregate(rel.getCluster(), traitSet,
+ convert(agg.getInput(), traitSet), agg.indicator, agg.getGroupSet(),
+ agg.getGroupSets(), agg.getAggCallList());
+ } catch (InvalidRelException e) {
+ RelOptPlanner.LOGGER.fine(e.toString());
+ return null;
+ }
+ }
+ }
+
+ /** Implementation of {@link org.apache.calcite.rel.core.Window}
+ * in bindable convention. */
+ public static class BindableWindow extends Window implements BindableRel {
+ /** Creates an BindableWindowRel. */
+ BindableWindow(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+ List<RexLiteral> constants, RelDataType rowType, List<Group> groups) {
+ super(cluster, traits, child, constants, rowType, groups);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new BindableWindow(getCluster(), traitSet, sole(inputs),
+ constants, rowType, groups);
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(BindableConvention.COST_MULTIPLIER);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return help(dataContext, this);
+ }
+
+ public Node implement(InterpreterImplementor implementor) {
+ return new WindowNode(implementor.interpreter, this);
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalWindow}
+ * to a {@link BindableWindow}.
+ */
+ private static class BindableWindowRule extends ConverterRule {
+ BindableWindowRule() {
+ super(LogicalWindow.class, Convention.NONE, BindableConvention.INSTANCE,
+ "BindableWindowRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalWindow winAgg = (LogicalWindow) rel;
+ final RelTraitSet traitSet =
+ winAgg.getTraitSet().replace(BindableConvention.INSTANCE);
+ final RelNode child = winAgg.getInput();
+ final RelNode convertedChild =
+ convert(child,
+ child.getTraitSet().replace(BindableConvention.INSTANCE));
+ return new BindableWindow(rel.getCluster(), traitSet, convertedChild,
+ winAgg.getConstants(), winAgg.getRowType(), winAgg.groups);
+ }
+ }
+}
+
+// End Bindables.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Context.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Context.java b/core/src/main/java/org/apache/calcite/interpreter/Context.java
index fc80319..f125454 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Context.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Context.java
@@ -16,12 +16,20 @@
*/
package org.apache.calcite.interpreter;
+import org.apache.calcite.DataContext;
+
/**
* Context for executing a scalar expression in an interpreter.
*/
public class Context {
+ public final DataContext root;
+
/** Values of incoming columns from all inputs. */
public Object[] values;
+
+ Context(DataContext root) {
+ this.root = root;
+ }
}
// End Context.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
index d5461c4..e06e12d 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java
@@ -18,20 +18,21 @@ package org.apache.calcite.interpreter;
import org.apache.calcite.rel.core.Filter;
+import com.google.common.collect.ImmutableList;
+
/**
* Interpreter node that implements a
* {@link org.apache.calcite.rel.core.Filter}.
*/
-public class FilterNode implements Node {
+public class FilterNode extends AbstractSingleNode<Filter> {
private final Scalar condition;
- private final Source source;
- private final Sink sink;
private final Context context;
public FilterNode(Interpreter interpreter, Filter rel) {
- this.condition = interpreter.compile(rel.getCondition());
- this.source = interpreter.source(rel, 0);
- this.sink = interpreter.sink(rel);
+ super(interpreter, rel);
+ this.condition =
+ interpreter.compile(ImmutableList.of(rel.getCondition()),
+ rel.getInputs());
this.context = interpreter.createContext();
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/InterpretableConvention.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableConvention.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableConvention.java
new file mode 100644
index 0000000..fb41d9d
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableConvention.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+
+/**
+ * Calling convention that returns results as an
+ * {@link org.apache.calcite.linq4j.Enumerable} of object arrays.
+ *
+ * <p>Unlike enumerable convention, no code generation is required.
+ */
+public enum InterpretableConvention implements Convention {
+ INSTANCE;
+
+ @Override public String toString() {
+ return getName();
+ }
+
+ public Class getInterface() {
+ return EnumerableRel.class;
+ }
+
+ public String getName() {
+ return "INTERPRETABLE";
+ }
+
+ public RelTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ public boolean subsumes(RelTrait trait) {
+ return this == trait;
+ }
+
+ public void register(RelOptPlanner planner) {}
+}
+
+// End InterpretableConvention.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/InterpretableConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableConverter.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableConverter.java
new file mode 100644
index 0000000..8b62174
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.runtime.ArrayBindable;
+
+import java.util.List;
+
+/**
+ * Relational expression that converts any relational expression input to
+ * {@link org.apache.calcite.interpreter.InterpretableConvention}, by wrapping
+ * it in an interpreter.
+ */
+public class InterpretableConverter extends ConverterImpl
+ implements ArrayBindable {
+ protected InterpretableConverter(RelOptCluster cluster, RelTraitSet traits,
+ RelNode input) {
+ super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new InterpretableConverter(getCluster(), traitSet, sole(inputs));
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return new Interpreter(dataContext, getInput());
+ }
+}
+
+// End InterpretableConverter.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
new file mode 100644
index 0000000..340299e
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.rel.RelNode;
+
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Relational expression that can implement itself using an interpreter.
+ */
+public interface InterpretableRel extends RelNode {
+ /** Creates an interpreter node to implement this relational expression. */
+ Node implement(InterpreterImplementor implementor);
+
+ /** Context when a {@link RelNode} is being converted to an interpreter
+ * {@link Node}. */
+ class InterpreterImplementor {
+ public final Interpreter interpreter;
+ public final Map<String, Object> internalParameters =
+ Maps.newLinkedHashMap();
+ public final CalcitePrepare.SparkHandler spark;
+ public final DataContext dataContext;
+ public final Map<RelNode, List<Sink>> relSinks = Maps.newHashMap();
+
+ public InterpreterImplementor(Interpreter interpreter,
+ CalcitePrepare.SparkHandler spark,
+ DataContext dataContext) {
+ this.interpreter = interpreter;
+ this.spark = spark;
+ this.dataContext = dataContext;
+ }
+ }
+}
+
+// End InterpretableRel.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index cbcfa64..93f18ef 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -26,16 +26,15 @@ import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.ReflectUtil;
import org.apache.calcite.util.ReflectiveVisitDispatcher;
import org.apache.calcite.util.ReflectiveVisitor;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.math.BigDecimal;
-import java.util.AbstractList;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
@@ -53,9 +52,12 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
private final DataContext dataContext;
private final RelNode rootRel;
private final Map<RelNode, List<RelNode>> relInputs = Maps.newHashMap();
+ protected final ScalarCompiler scalarCompiler;
public Interpreter(DataContext dataContext, RelNode rootRel) {
this.dataContext = dataContext;
+ this.scalarCompiler =
+ new JaninoRexCompiler(rootRel.getCluster().getRexBuilder());
Compiler compiler = new Nodes.CoreCompiler(this);
this.rootRel = compiler.visitRoot(rootRel);
}
@@ -106,96 +108,116 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
}
/** Compiles an expression to an executable form. */
- public Scalar compile(final RexNode node) {
- if (node instanceof RexCall) {
- final RexCall call = (RexCall) node;
- final ImmutableList.Builder<Scalar> list = ImmutableList.builder();
- for (RexNode operand : call.getOperands()) {
- list.add(compile(operand));
- }
- final ImmutableList<Scalar> scalars = list.build();
- return new Scalar() {
- public Object execute(final Context context) {
- final List<Object> args;
- Comparable o0;
- Comparable o1;
- switch (call.getKind()) {
- case LESS_THAN:
- case LESS_THAN_OR_EQUAL:
- case GREATER_THAN:
- case GREATER_THAN_OR_EQUAL:
- case EQUALS:
- case NOT_EQUALS:
- args = lazyArgs(context);
- o0 = (Comparable) args.get(0);
- if (o0 == null) {
- return null;
- }
- o1 = (Comparable) args.get(1);
- if (o1 == null) {
- return null;
- }
- if (o0 instanceof BigDecimal) {
- if (o1 instanceof Double || o1 instanceof Float) {
- o1 = new BigDecimal(((Number) o1).doubleValue());
- } else {
- o1 = new BigDecimal(((Number) o1).longValue());
- }
- }
- if (o1 instanceof BigDecimal) {
- if (o0 instanceof Double || o0 instanceof Float) {
- o0 = new BigDecimal(((Number) o0).doubleValue());
- } else {
- o0 = new BigDecimal(((Number) o0).longValue());
- }
- }
- final int c = o0.compareTo(o1);
+ public Scalar compile(List<RexNode> nodes, List<RelNode> inputs) {
+ return scalarCompiler.compile(inputs, nodes);
+ }
+
+ /** Not used. */
+ private class FooCompiler implements ScalarCompiler {
+ public Scalar compile(List<RelNode> inputs, List<RexNode> nodes) {
+ final RexNode node = nodes.get(0);
+ if (node instanceof RexCall) {
+ final RexCall call = (RexCall) node;
+ final Scalar argScalar = compile(inputs, call.getOperands());
+ return new Scalar() {
+ final Object[] args = new Object[call.getOperands().size()];
+
+ public void execute(final Context context, Object[] results) {
+ results[0] = execute(context);
+ }
+
+ public Object execute(Context context) {
+ Comparable o0;
+ Comparable o1;
switch (call.getKind()) {
case LESS_THAN:
- return c < 0;
case LESS_THAN_OR_EQUAL:
- return c <= 0;
case GREATER_THAN:
- return c > 0;
case GREATER_THAN_OR_EQUAL:
- return c >= 0;
case EQUALS:
- return c == 0;
case NOT_EQUALS:
- return c != 0;
+ argScalar.execute(context, args);
+ o0 = (Comparable) args[0];
+ if (o0 == null) {
+ return null;
+ }
+ o1 = (Comparable) args[1];
+ if (o1 == null) {
+ return null;
+ }
+ if (o0 instanceof BigDecimal) {
+ if (o1 instanceof Double || o1 instanceof Float) {
+ o1 = new BigDecimal(((Number) o1).doubleValue());
+ } else {
+ o1 = new BigDecimal(((Number) o1).longValue());
+ }
+ }
+ if (o1 instanceof BigDecimal) {
+ if (o0 instanceof Double || o0 instanceof Float) {
+ o0 = new BigDecimal(((Number) o0).doubleValue());
+ } else {
+ o0 = new BigDecimal(((Number) o0).longValue());
+ }
+ }
+ final int c = o0.compareTo(o1);
+ switch (call.getKind()) {
+ case LESS_THAN:
+ return c < 0;
+ case LESS_THAN_OR_EQUAL:
+ return c <= 0;
+ case GREATER_THAN:
+ return c > 0;
+ case GREATER_THAN_OR_EQUAL:
+ return c >= 0;
+ case EQUALS:
+ return c == 0;
+ case NOT_EQUALS:
+ return c != 0;
+ default:
+ throw new AssertionError("unknown expression " + call);
+ }
default:
+ if (call.getOperator() == SqlStdOperatorTable.UPPER) {
+ argScalar.execute(context, args);
+ String s0 = (String) args[0];
+ if (s0 == null) {
+ return null;
+ }
+ return s0.toUpperCase();
+ }
+ if (call.getOperator() == SqlStdOperatorTable.SUBSTRING) {
+ argScalar.execute(context, args);
+ String s0 = (String) args[0];
+ Number i1 = (Number) args[1];
+ Number i2 = (Number) args[2];
+ if (s0 == null || i1 == null || i2 == null) {
+ return null;
+ }
+ return s0.substring(i1.intValue() - 1,
+ i1.intValue() - 1 + i2.intValue());
+ }
throw new AssertionError("unknown expression " + call);
}
- default:
- throw new AssertionError("unknown expression " + call);
}
+ };
+ }
+ return new Scalar() {
+ public void execute(Context context, Object[] results) {
+ results[0] = execute(context);
}
- private List<Object> lazyArgs(final Context context) {
- return new AbstractList<Object>() {
- @Override public Object get(int index) {
- return scalars.get(index).execute(context);
- }
-
- @Override public int size() {
- return scalars.size();
- }
- };
+ public Object execute(Context context) {
+ switch (node.getKind()) {
+ case LITERAL:
+ return ((RexLiteral) node).getValue();
+ case INPUT_REF:
+ return context.values[((RexInputRef) node).getIndex()];
+ default:
+ throw new RuntimeException("unknown expression type " + node);
+ }
}
};
}
- return new Scalar() {
- public Object execute(Context context) {
- switch (node.getKind()) {
- case LITERAL:
- return ((RexLiteral) node).getValue();
- case INPUT_REF:
- return context.values[((RexInputRef) node).getIndex()];
- default:
- throw new RuntimeException("unknown expression type " + node);
- }
- }
- };
}
public Source source(RelNode rel, int ordinal) {
@@ -224,7 +246,7 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
}
public Context createContext() {
- return new Context();
+ return new Context(dataContext);
}
public DataContext getDataContext() {
@@ -368,6 +390,12 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
public void rewrite(RelNode r) {
}
}
+
+ /** Converts a list of expressions to a scalar that can compute their
+ * values. */
+ interface ScalarCompiler {
+ Scalar compile(List<RelNode> inputs, List<RexNode> nodes);
+ }
}
// End Interpreter.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Interpreters.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreters.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreters.java
new file mode 100644
index 0000000..a4b6eb0
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreters.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.runtime.ArrayBindable;
+
+/**
+ * Utilities relating to {@link org.apache.calcite.interpreter.Interpreter}
+ * and {@link org.apache.calcite.interpreter.InterpretableConvention}.
+ */
+public class Interpreters {
+ private Interpreters() {}
+
+ /** Creates a {@link org.apache.calcite.runtime.Bindable} that interprets a
+ * given relational expression. */
+ public static ArrayBindable bindable(final RelNode rel) {
+ if (rel instanceof ArrayBindable) {
+ // E.g. if rel instanceof BindableRel
+ return (ArrayBindable) rel;
+ }
+ return new ArrayBindable() {
+ public Enumerable<Object[]> bind(DataContext dataContext) {
+ return new Interpreter(dataContext, rel);
+ }
+
+ public Class<Object[]> getElementType() {
+ return Object[].class;
+ }
+ };
+ }
+}
+
+// End Interpreters.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java b/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
new file mode 100644
index 0000000..491efa9
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/JaninoRexCompiler.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MemberDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.commons.compiler.CompilerFactoryFactory;
+import org.codehaus.commons.compiler.IClassBodyEvaluator;
+import org.codehaus.commons.compiler.ICompilerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Compiles a scalar expression ({@link RexNode}) to an expression that
+ * can be evaluated ({@link Scalar}) by generating a Java AST and compiling it
+ * to a class using Janino.
+ */
+public class JaninoRexCompiler implements Interpreter.ScalarCompiler {
+ private final RexBuilder rexBuilder;
+
+ public JaninoRexCompiler(RexBuilder rexBuilder) {
+ this.rexBuilder = rexBuilder;
+ }
+
+ public Scalar compile(List<RelNode> inputs, List<RexNode> nodes) {
+ final RelDataTypeFactory.FieldInfoBuilder fieldBuilder =
+ rexBuilder.getTypeFactory().builder();
+ for (RelNode input : inputs) {
+ fieldBuilder.addAll(input.getRowType().getFieldList());
+ }
+ final RelDataType inputRowType = fieldBuilder.build();
+ final RexProgramBuilder programBuilder =
+ new RexProgramBuilder(inputRowType, rexBuilder);
+ for (RexNode node : nodes) {
+ programBuilder.addProject(node, null);
+ }
+ final RexProgram program = programBuilder.getProgram();
+
+ final BlockBuilder builder = new BlockBuilder();
+ final ParameterExpression context_ =
+ Expressions.parameter(Context.class, "context");
+ final ParameterExpression outputValues_ =
+ Expressions.parameter(Object[].class, "outputValues");
+ final JavaTypeFactoryImpl javaTypeFactory =
+ new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+ // public void execute(Context, Object[] outputValues)
+ final RexToLixTranslator.InputGetter inputGetter =
+ new RexToLixTranslator.InputGetterImpl(
+ ImmutableList.of(
+ Pair.<Expression, PhysType>of(
+ Expressions.field(context_,
+ BuiltInMethod.CONTEXT_VALUES.field),
+ PhysTypeImpl.of(javaTypeFactory, inputRowType,
+ JavaRowFormat.ARRAY, false))));
+ final Function1<String, RexToLixTranslator.InputGetter> correlates =
+ new Function1<String, RexToLixTranslator.InputGetter>() {
+ public RexToLixTranslator.InputGetter apply(String a0) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ final Expression root =
+ Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
+ final List<Expression> list =
+ RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
+ null, root, inputGetter, correlates);
+ for (int i = 0; i < list.size(); i++) {
+ builder.add(
+ Expressions.statement(
+ Expressions.assign(
+ Expressions.arrayIndex(outputValues_,
+ Expressions.constant(i)),
+ list.get(i))));
+ }
+ return baz(context_, outputValues_, builder.toBlock());
+ }
+
+ /** Given a method that implements {@link Scalar#execute(Context, Object[])},
+ * adds a bridge method that implements {@link Scalar#execute(Context)}, and
+ * compiles. */
+ static Scalar baz(ParameterExpression context_,
+ ParameterExpression outputValues_, BlockStatement block) {
+ final List<MemberDeclaration> declarations = Lists.newArrayList();
+
+ // public void execute(Context, Object[] outputValues)
+ declarations.add(
+ Expressions.methodDecl(Modifier.PUBLIC, void.class,
+ BuiltInMethod.SCALAR_EXECUTE2.method.getName(),
+ ImmutableList.of(context_, outputValues_), block));
+
+ // public Object execute(Context)
+ final BlockBuilder builder = new BlockBuilder();
+ final Expression values_ = builder.append("values",
+ Expressions.newArrayBounds(Object.class, 1,
+ Expressions.constant(1)));
+ builder.add(
+ Expressions.statement(
+ Expressions.call(
+ Expressions.parameter(Scalar.class, "this"),
+ BuiltInMethod.SCALAR_EXECUTE2.method, context_, values_)));
+ builder.add(
+ Expressions.return_(null,
+ Expressions.arrayIndex(values_, Expressions.constant(0))));
+ declarations.add(
+ Expressions.methodDecl(Modifier.PUBLIC, Object.class,
+ BuiltInMethod.SCALAR_EXECUTE1.method.getName(),
+ ImmutableList.of(context_), builder.toBlock()));
+
+ final ClassDeclaration classDeclaration =
+ Expressions.classDecl(Modifier.PUBLIC, "Buzz", null,
+ ImmutableList.<Type>of(Scalar.class), declarations);
+ String s = Expressions.toString(declarations, "\n", false);
+ if (CalcitePrepareImpl.DEBUG) {
+ Util.debugCode(System.out, s);
+ }
+ try {
+ return getScalar(classDeclaration, s);
+ } catch (CompileException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static Scalar getScalar(ClassDeclaration expr, String s)
+ throws CompileException, IOException {
+ ICompilerFactory compilerFactory;
+ try {
+ compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory();
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Unable to instantiate java compiler", e);
+ }
+ IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator();
+ cbe.setClassName(expr.name);
+ cbe.setImplementedInterfaces(new Class[]{Scalar.class});
+ cbe.setParentClassLoader(JaninoRexCompiler.class.getClassLoader());
+ if (CalcitePrepareImpl.DEBUG) {
+ // Add line numbers to the generated janino class
+ cbe.setDebuggingInformation(true, true, true);
+ }
+ return (Scalar) cbe.createInstance(new StringReader(s));
+ }
+}
+
+// End JaninoRexCompiler.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
new file mode 100644
index 0000000..498c8a3
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.rel.core.Join;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Interpreter node that implements a
+ * {@link org.apache.calcite.rel.core.Join}.
+ */
+public class JoinNode implements Node {
+ private final Source leftSource;
+ private final Source rightSource;
+ private final Sink sink;
+ private final Join rel;
+ private final Scalar condition;
+ private final Context context;
+
+ public JoinNode(Interpreter interpreter, Join rel) {
+ this.leftSource = interpreter.source(rel, 0);
+ this.rightSource = interpreter.source(rel, 1);
+ this.sink = interpreter.sink(rel);
+ this.condition = interpreter.compile(ImmutableList.of(rel.getCondition()),
+ rel.getInputs());
+ this.rel = rel;
+ this.context = interpreter.createContext();
+
+ }
+
+ public void run() throws InterruptedException {
+ List<Row> rightList = null;
+ final int leftCount = rel.getLeft().getRowType().getFieldCount();
+ final int rightCount = rel.getRight().getRowType().getFieldCount();
+ context.values = new Object[rel.getRowType().getFieldCount()];
+ Row left;
+ Row right;
+ while ((left = leftSource.receive()) != null) {
+ System.arraycopy(left.getValues(), 0, context.values, 0, leftCount);
+ if (rightList == null) {
+ rightList = Lists.newArrayList();
+ while ((right = rightSource.receive()) != null) {
+ rightList.add(right);
+ }
+ }
+ for (Row right2 : rightList) {
+ System.arraycopy(right2.getValues(), 0, context.values, leftCount,
+ rightCount);
+ final Boolean execute = (Boolean) condition.execute(context);
+ if (execute != null && execute) {
+ sink.send(Row.asCopy(context.values));
+ }
+ }
+ }
+ }
+}
+
+// End JoinNode.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
index b09e7c3..1e067bb 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
@@ -24,10 +24,13 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.rules.FilterTableRule;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.FilterableTable;
@@ -149,16 +152,29 @@ public class Nodes {
}
public void visit(TableScan scan) {
- node = new ScanNode(interpreter, scan, ImmutableList.<RexNode>of(), null);
+ node = new TableScanNode(interpreter, scan, ImmutableList.<RexNode>of(),
+ null);
}
public void visit(FilterScan scan) {
- node = new ScanNode(interpreter, scan, scan.filters, scan.projects);
+ node = new TableScanNode(interpreter, scan, scan.filters, scan.projects);
}
public void visit(Sort sort) {
node = new SortNode(interpreter, sort);
}
+
+ public void visit(Union union) {
+ node = new UnionNode(interpreter, union);
+ }
+
+ public void visit(Join join) {
+ node = new JoinNode(interpreter, join);
+ }
+
+ public void visit(Window window) {
+ node = new WindowNode(interpreter, window);
+ }
}
/** Table scan that applies filters and optionally projects. Only used in an
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/NoneToBindableConverterRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/NoneToBindableConverterRule.java b/core/src/main/java/org/apache/calcite/interpreter/NoneToBindableConverterRule.java
new file mode 100644
index 0000000..ab41cac
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/NoneToBindableConverterRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+
+/**
+ * Rule to convert a relational expression from
+ * {@link org.apache.calcite.plan.Convention#NONE}
+ * to {@link org.apache.calcite.interpreter.BindableConvention}.
+ */
+public class NoneToBindableConverterRule extends ConverterRule {
+ public static final ConverterRule INSTANCE =
+ new NoneToBindableConverterRule();
+
+ private NoneToBindableConverterRule() {
+ super(RelNode.class, Convention.NONE, BindableConvention.INSTANCE,
+ "NoneToBindableConverterRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+ return new InterpretableConverter(rel.getCluster(), newTraitSet, rel);
+ }
+}
+
+// End NoneToBindableConverterRule.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
index bca464b..1737b8b 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java
@@ -17,28 +17,20 @@
package org.apache.calcite.interpreter;
import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexNode;
-
-import com.google.common.collect.ImmutableList;
/**
* Interpreter node that implements a
* {@link org.apache.calcite.rel.logical.LogicalFilter}.
*/
-public class ProjectNode implements Node {
- private final ImmutableList<Scalar> projects;
- private final Source source;
- private final Sink sink;
+public class ProjectNode extends AbstractSingleNode<Project> {
+ private final Scalar scalar;
private final Context context;
+ private final int projectCount;
public ProjectNode(Interpreter interpreter, Project rel) {
- ImmutableList.Builder<Scalar> builder = ImmutableList.builder();
- for (RexNode node : rel.getProjects()) {
- builder.add(interpreter.compile(node));
- }
- this.projects = builder.build();
- this.source = interpreter.source(rel, 0);
- this.sink = interpreter.sink(rel);
+ super(interpreter, rel);
+ this.projectCount = rel.getProjects().size();
+ this.scalar = interpreter.compile(rel.getProjects(), rel.getInputs());
this.context = interpreter.createContext();
}
@@ -46,11 +38,8 @@ public class ProjectNode implements Node {
Row row;
while ((row = source.receive()) != null) {
context.values = row.getValues();
- Object[] values = new Object[projects.size()];
- for (int i = 0; i < projects.size(); i++) {
- Scalar scalar = projects.get(i);
- values[i] = scalar.execute(context);
- }
+ Object[] values = new Object[projectCount];
+ scalar.execute(context, values);
sink.send(new Row(values));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Scalar.java b/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
index 2f7e923..795e9d4 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Scalar.java
@@ -21,6 +21,7 @@ package org.apache.calcite.interpreter;
*/
public interface Scalar {
Object execute(Context context);
+ void execute(Context context, Object[] results);
}
// End Scalar.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/ScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/ScanNode.java
deleted file mode 100644
index 2688353..0000000
--- a/core/src/main/java/org/apache/calcite/interpreter/ScanNode.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.interpreter;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.runtime.Enumerables;
-import org.apache.calcite.schema.FilterableTable;
-import org.apache.calcite.schema.ProjectableFilterableTable;
-import org.apache.calcite.schema.QueryableTable;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Schemas;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Util;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.util.List;
-
-/**
- * Interpreter node that implements a
- * {@link org.apache.calcite.rel.core.TableScan}.
- */
-public class ScanNode implements Node {
- private final Sink sink;
- private final TableScan rel;
- private final ImmutableList<RexNode> filters;
- private final DataContext root;
- private final int[] projects;
-
- public ScanNode(Interpreter interpreter, TableScan rel,
- ImmutableList<RexNode> filters, ImmutableIntList projects) {
- this.rel = rel;
- this.filters = Preconditions.checkNotNull(filters);
- this.projects = projects == null ? null : projects.toIntArray();
- this.sink = interpreter.sink(rel);
- this.root = interpreter.getDataContext();
- }
-
- public void run() throws InterruptedException {
- final Enumerable<Row> iterable = iterable();
- final Enumerator<Row> enumerator = iterable.enumerator();
- while (enumerator.moveNext()) {
- sink.send(enumerator.current());
- }
- enumerator.close();
- sink.end();
- }
-
- private Enumerable<Row> iterable() {
- final RelOptTable table = rel.getTable();
- final ProjectableFilterableTable pfTable =
- table.unwrap(ProjectableFilterableTable.class);
- if (pfTable != null) {
- final List<RexNode> filters1 = Lists.newArrayList(filters);
- final int[] projects1 =
- isIdentity(projects, rel.getRowType().getFieldCount())
- ? null : projects;
- final Enumerable<Object[]> enumerator =
- pfTable.scan(root, filters1, projects1);
- assert filters1.isEmpty()
- : "table could not handle a filter it earlier said it could";
- return Enumerables.toRow(enumerator);
- }
- if (projects != null) {
- throw new AssertionError("have projects, but table cannot handle them");
- }
- final FilterableTable filterableTable =
- table.unwrap(FilterableTable.class);
- if (filterableTable != null) {
- final List<RexNode> filters1 = Lists.newArrayList(filters);
- final Enumerable<Object[]> enumerator =
- filterableTable.scan(root, filters1);
- assert filters1.isEmpty()
- : "table could not handle a filter it earlier said it could";
- return Enumerables.toRow(enumerator);
- }
- if (!filters.isEmpty()) {
- throw new AssertionError("have filters, but table cannot handle them");
- }
- //noinspection unchecked
- Enumerable<Row> iterable = table.unwrap(Enumerable.class);
- if (iterable != null) {
- return iterable;
- }
- final QueryableTable queryableTable = table.unwrap(QueryableTable.class);
- if (queryableTable != null) {
- final Type elementType = queryableTable.getElementType();
- SchemaPlus schema = root.getRootSchema();
- for (String name : Util.skipLast(table.getQualifiedName())) {
- schema = schema.getSubSchema(name);
- }
- if (elementType instanceof Class) {
- //noinspection unchecked
- final Queryable<Object> queryable = Schemas.queryable(root,
- (Class) elementType, table.getQualifiedName());
- ImmutableList.Builder<Field> fieldBuilder = ImmutableList.builder();
- Class type = (Class) elementType;
- for (Field field : type.getFields()) {
- if (Modifier.isPublic(field.getModifiers())
- && !Modifier.isStatic(field.getModifiers())) {
- fieldBuilder.add(field);
- }
- }
- final List<Field> fields = fieldBuilder.build();
- return queryable.select(
- new Function1<Object, Row>() {
- public Row apply(Object o) {
- final Object[] values = new Object[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.get(i);
- try {
- values[i] = field.get(o);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- return new Row(values);
- }
- });
- } else {
- return Schemas.queryable(root, Row.class,
- table.getQualifiedName());
- }
- }
- final ScannableTable scannableTable =
- table.unwrap(ScannableTable.class);
- if (scannableTable != null) {
- return Enumerables.toRow(scannableTable.scan(root));
- }
- throw new AssertionError("cannot convert table " + table + " to iterable");
- }
-
- private static boolean isIdentity(int[] is, int count) {
- if (is.length != count) {
- return false;
- }
- for (int i = 0; i < is.length; i++) {
- if (is[i] != i) {
- return false;
- }
- }
- return true;
- }
-}
-
-// End ScanNode.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
index e5fb91d..1e5f07b 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java
@@ -34,15 +34,9 @@ import java.util.List;
* Interpreter node that implements a
* {@link org.apache.calcite.rel.core.Sort}.
*/
-public class SortNode implements Node {
- private final Source source;
- private final Sink sink;
- private final Sort rel;
-
+public class SortNode extends AbstractSingleNode<Sort> {
public SortNode(Interpreter interpreter, Sort rel) {
- this.rel = rel;
- this.source = interpreter.source(rel, 0);
- this.sink = interpreter.sink(rel);
+ super(interpreter, rel);
}
public void run() throws InterruptedException {
@@ -103,30 +97,55 @@ public class SortNode implements Node {
}));
}
+ private static int compare(Comparable c1, Comparable c2,
+ int nullComparison) {
+ if (c1 == c2) {
+ return 0;
+ } else if (c1 == null) {
+ return nullComparison;
+ } else if (c2 == null) {
+ return -nullComparison;
+ } else {
+ //noinspection unchecked
+ return c1.compareTo(c2);
+ }
+ }
+
private Comparator<Row> comparator(final RelFieldCollation fieldCollation) {
+ final int nullComparison = getNullComparison(fieldCollation.nullDirection);
switch (fieldCollation.direction) {
case ASCENDING:
return new Comparator<Row>() {
- final int x = fieldCollation.getFieldIndex();
public int compare(Row o1, Row o2) {
+ final int x = fieldCollation.getFieldIndex();
final Comparable c1 = (Comparable) o1.getValues()[x];
final Comparable c2 = (Comparable) o2.getValues()[x];
- //noinspection unchecked
- return c1.compareTo(c2);
+ return SortNode.compare(c1, c2, nullComparison);
}
};
default:
return new Comparator<Row>() {
- final int x = fieldCollation.getFieldIndex();
public int compare(Row o1, Row o2) {
+ final int x = fieldCollation.getFieldIndex();
final Comparable c1 = (Comparable) o1.getValues()[x];
final Comparable c2 = (Comparable) o2.getValues()[x];
- //noinspection unchecked
- return c2.compareTo(c1);
+ return SortNode.compare(c2, c1, -nullComparison);
}
};
}
}
+
+ private int getNullComparison(RelFieldCollation.NullDirection nullDirection) {
+ switch (nullDirection) {
+ case FIRST:
+ return -1;
+ case UNSPECIFIED:
+ case LAST:
+ return 1;
+ default:
+ throw new AssertionError(nullDirection);
+ }
+ }
}
// End SortNode.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
new file mode 100644
index 0000000..6f2c90f
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.runtime.Enumerables;
+import org.apache.calcite.schema.FilterableTable;
+import org.apache.calcite.schema.ProjectableFilterableTable;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Interpreter node that implements a
+ * {@link org.apache.calcite.rel.core.TableScan}.
+ */
+public class TableScanNode implements Node {
+ private final Sink sink;
+ private final TableScan rel;
+ private final ImmutableList<RexNode> filters;
+ private final DataContext root;
+ private final int[] projects;
+
+ TableScanNode(Interpreter interpreter, TableScan rel,
+ ImmutableList<RexNode> filters, ImmutableIntList projects) {
+ this.rel = rel;
+ this.filters = Preconditions.checkNotNull(filters);
+ this.projects = projects == null ? null : projects.toIntArray();
+ this.sink = interpreter.sink(rel);
+ this.root = interpreter.getDataContext();
+ }
+
+ public void run() throws InterruptedException {
+ final Enumerable<Row> iterable = iterable();
+ final Enumerator<Row> enumerator = iterable.enumerator();
+ while (enumerator.moveNext()) {
+ sink.send(enumerator.current());
+ }
+ enumerator.close();
+ sink.end();
+ }
+
+ private Enumerable<Row> iterable() {
+ final RelOptTable table = rel.getTable();
+ final ProjectableFilterableTable pfTable =
+ table.unwrap(ProjectableFilterableTable.class);
+ if (pfTable != null) {
+ final List<RexNode> filters1 = Lists.newArrayList(filters);
+ final int[] projects1 =
+ projects == null
+ || isIdentity(projects, rel.getRowType().getFieldCount())
+ ? null : projects;
+ final Enumerable<Object[]> enumerator =
+ pfTable.scan(root, filters1, projects1);
+ assert filters1.isEmpty()
+ : "table could not handle a filter it earlier said it could";
+ return Enumerables.toRow(enumerator);
+ }
+ if (projects != null) {
+ throw new AssertionError("have projects, but table cannot handle them");
+ }
+ final FilterableTable filterableTable =
+ table.unwrap(FilterableTable.class);
+ if (filterableTable != null) {
+ final List<RexNode> filters1 = Lists.newArrayList(filters);
+ final Enumerable<Object[]> enumerator =
+ filterableTable.scan(root, filters1);
+ assert filters1.isEmpty()
+ : "table could not handle a filter it earlier said it could";
+ return Enumerables.toRow(enumerator);
+ }
+ if (!filters.isEmpty()) {
+ throw new AssertionError("have filters, but table cannot handle them");
+ }
+ final ScannableTable scannableTable =
+ table.unwrap(ScannableTable.class);
+ if (scannableTable != null) {
+ return Enumerables.toRow(scannableTable.scan(root));
+ }
+ //noinspection unchecked
+ Enumerable<Row> iterable = table.unwrap(Enumerable.class);
+ if (iterable != null) {
+ return iterable;
+ }
+ final QueryableTable queryableTable = table.unwrap(QueryableTable.class);
+ if (queryableTable != null) {
+ final Type elementType = queryableTable.getElementType();
+ SchemaPlus schema = root.getRootSchema();
+ for (String name : Util.skipLast(table.getQualifiedName())) {
+ schema = schema.getSubSchema(name);
+ }
+ if (elementType instanceof Class) {
+ //noinspection unchecked
+ final Queryable<Object> queryable = Schemas.queryable(root,
+ (Class) elementType, table.getQualifiedName());
+ ImmutableList.Builder<Field> fieldBuilder = ImmutableList.builder();
+ Class type = (Class) elementType;
+ for (Field field : type.getFields()) {
+ if (Modifier.isPublic(field.getModifiers())
+ && !Modifier.isStatic(field.getModifiers())) {
+ fieldBuilder.add(field);
+ }
+ }
+ final List<Field> fields = fieldBuilder.build();
+ return queryable.select(
+ new Function1<Object, Row>() {
+ public Row apply(Object o) {
+ final Object[] values = new Object[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ Field field = fields.get(i);
+ try {
+ values[i] = field.get(o);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return new Row(values);
+ }
+ });
+ } else {
+ return Schemas.queryable(root, Row.class,
+ table.getQualifiedName());
+ }
+ }
+ throw new AssertionError("cannot convert table " + table + " to iterable");
+ }
+
+ private static boolean isIdentity(int[] is, int count) {
+ if (is.length != count) {
+ return false;
+ }
+ for (int i = 0; i < is.length; i++) {
+ if (is[i] != i) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
+
+// End TableScanNode.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
new file mode 100644
index 0000000..701dbc7
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.rel.core.Union;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+/**
+ * Interpreter node that implements a
+ * {@link org.apache.calcite.rel.core.Union}.
+ */
+public class UnionNode implements Node {
+ private final ImmutableList<Source> sources;
+ private final Sink sink;
+ private final Union rel;
+
+ public UnionNode(Interpreter interpreter, Union rel) {
+ ImmutableList.Builder<Source> builder = ImmutableList.builder();
+ for (int i = 0; i < rel.getInputs().size(); i++) {
+ builder.add(interpreter.source(rel, i));
+ }
+ this.sources = builder.build();
+ this.sink = interpreter.sink(rel);
+ this.rel = rel;
+ }
+
+ public void run() throws InterruptedException {
+ final Set<Row> rows = rel.all ? null : Sets.<Row>newHashSet();
+ for (Source source : sources) {
+ Row row;
+ while ((row = source.receive()) != null) {
+ if (rows == null || rows.add(row)) {
+ sink.send(row);
+ }
+ }
+ }
+ }
+}
+
+// End UnionNode.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
index fe68790..ff8c950 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java
@@ -16,8 +16,13 @@
*/
package org.apache.calcite.interpreter;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Values;
import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import java.util.List;
@@ -27,23 +32,38 @@ import java.util.List;
*/
public class ValuesNode implements Node {
private final Sink sink;
- private final Values rel;
private final int fieldCount;
+ private final ImmutableList<Row> rows;
public ValuesNode(Interpreter interpreter, Values rel) {
- this.rel = rel;
this.sink = interpreter.sink(rel);
this.fieldCount = rel.getRowType().getFieldCount();
+ this.rows = createRows(interpreter, rel.getTuples());
+ }
+
+ private ImmutableList<Row> createRows(Interpreter interpreter,
+ ImmutableList<ImmutableList<RexLiteral>> tuples) {
+ final List<RexNode> nodes = Lists.newArrayList();
+ for (ImmutableList<RexLiteral> tuple : tuples) {
+ nodes.addAll(tuple);
+ }
+ final Scalar scalar =
+ interpreter.compile(nodes, ImmutableList.<RelNode>of());
+ final Object[] values = new Object[nodes.size()];
+ final Context context = interpreter.createContext();
+ scalar.execute(context, values);
+ final ImmutableList.Builder<Row> rows = ImmutableList.builder();
+ Object[] subValues = new Object[fieldCount];
+ for (int i = 0; i < values.length; i += fieldCount) {
+ System.arraycopy(values, i, subValues, 0, fieldCount);
+ rows.add(Row.asCopy(subValues));
+ }
+ return rows.build();
}
public void run() throws InterruptedException {
- for (List<RexLiteral> list : rel.getTuples()) {
- final Object[] values = new Object[fieldCount];
- for (int i = 0; i < list.size(); i++) {
- RexLiteral rexLiteral = list.get(i);
- values[i] = rexLiteral.getValue();
- }
- sink.send(new Row(values));
+ for (Row row : rows) {
+ sink.send(row);
}
sink.end();
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
new file mode 100644
index 0000000..a03608c
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.rel.core.Window;
+
+/**
+ * Interpreter node that implements a
+ * {@link org.apache.calcite.rel.core.Window}.
+ */
+public class WindowNode extends AbstractSingleNode<Window> {
+ WindowNode(Interpreter interpreter, Window rel) {
+ super(interpreter, rel);
+ }
+
+ public void run() throws InterruptedException {
+ Row row;
+ while ((row = source.receive()) != null) {
+ sink.send(row);
+ }
+ sink.end();
+ }
+}
+
+// End WindowNode.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/66cfb120/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java b/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java
index 00e61b1..acbdfe9 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java
@@ -33,6 +33,7 @@ import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.runtime.ArrayBindable;
import org.apache.calcite.runtime.Bindable;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidator;
@@ -104,7 +105,7 @@ public interface CalcitePrepare {
boolean enabled();
- Bindable compile(ClassDeclaration expr, String s);
+ ArrayBindable compile(ClassDeclaration expr, String s);
Object sparkContext();
@@ -177,7 +178,7 @@ public interface CalcitePrepare {
return false;
}
- public Bindable compile(ClassDeclaration expr, String s) {
+ public ArrayBindable compile(ClassDeclaration expr, String s) {
throw new UnsupportedOperationException();
}