You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/02 02:09:50 UTC
[14/18] incubator-calcite git commit: Further fix up [CALCITE-809],
eliminating the proxy sink
Further fix up [CALCITE-809], eliminating the proxy sink
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/5e7d457a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/5e7d457a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/5e7d457a
Branch: refs/heads/master
Commit: 5e7d457ae82cc7a0e3bf055fa2e1f45b19186dcf
Parents: a3da691
Author: Julian Hyde <jh...@apache.org>
Authored: Mon Aug 10 17:24:09 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Sep 1 16:17:16 2015 -0700
----------------------------------------------------------------------
.../apache/calcite/interpreter/Interpreter.java | 130 ++++++++-----------
.../calcite/interpreter/TableScanNode.java | 10 +-
.../java/org/apache/calcite/linq4j/Linq4j.java | 85 +++++-------
.../calcite/linq4j/TransformedEnumerator.java | 51 ++++++++
.../apache/calcite/linq4j/test/Linq4jTest.java | 22 ++++
5 files changed, 161 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index e4c2cfe..e14bffe 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -18,18 +18,16 @@ package org.apache.calcite.interpreter;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
-import org.apache.calcite.linq4j.DelegatingEnumerator;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Linq4j;
-import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.TransformedEnumerator;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.rules.CalcSplitRule;
import org.apache.calcite.rel.rules.FilterTableScanRule;
import org.apache.calcite.rel.rules.ProjectTableScanRule;
@@ -92,29 +90,21 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
public Enumerator<Object[]> enumerator() {
start();
- Sink sink = nodes.get(rootRel).sink;
- Enumerator<Row> rows;
- if (sink instanceof EnumerableProxySink) {
- rows = ((EnumerableProxySink) sink).enumerable.enumerator();
+ final NodeInfo nodeInfo = nodes.get(rootRel);
+ final Enumerator<Row> rows;
+ if (nodeInfo.rowEnumerable != null) {
+ rows = nodeInfo.rowEnumerable.enumerator();
} else {
- final ArrayDeque<Row> queue = ((ListSink) sink).list;
- rows = Linq4j.asEnumerable(queue).enumerator();
+ final ArrayDeque<Row> queue = ((ListSink) nodeInfo.sink).list;
+ rows = Linq4j.iterableEnumerator(queue);
}
- return new DelegatingEnumerator<Object[]>(Linq4j.transform(rows, rowConverter)) {
- @Override public void close() {
- super.close();
- Interpreter.this.close();
- }
- };
- }
-
- private Function1<Row, Object[]> rowConverter =
- new Function1<Row, Object[]>() {
- @Override public Object[] apply(Row row) {
+ return new TransformedEnumerator<Row, Object[]>(rows) {
+ protected Object[] transform(Row row) {
return row.getValues();
}
};
+ }
private void start() {
// We rely on the nodes being ordered leaves first.
@@ -259,15 +249,16 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
public Source source(RelNode rel, int ordinal) {
final RelNode input = getInput(rel, ordinal);
- final NodeInfo x = nodes.get(input);
- if (x == null) {
+ final NodeInfo nodeInfo = nodes.get(input);
+ if (nodeInfo == null) {
throw new AssertionError("should be registered: " + rel);
}
- Sink sink = x.sink;
+ if (nodeInfo.rowEnumerable != null) {
+ return new EnumeratorSource(nodeInfo.rowEnumerable.enumerator());
+ }
+ Sink sink = nodeInfo.sink;
if (sink instanceof ListSink) {
- return new ListSource((ListSink) x.sink);
- } else if (sink instanceof EnumerableProxySink) {
- return new EnumerableProxySource((EnumerableProxySink) sink);
+ return new ListSource((ListSink) nodeInfo.sink);
}
throw new IllegalStateException(
"Got a sink " + sink + " to which there is no match source type!");
@@ -281,19 +272,39 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
return rel.getInput(ordinal);
}
+ /**
+ * Creates a Sink for a relational expression to write into.
+ *
+ * <p>This method is generally called from the constructor of a {@link Node}.
+ * But a constructor could instead call
+ * {@link #enumerable(RelNode, Enumerable)}.
+ *
+ * @param rel Relational expression
+ * @return Sink
+ */
public Sink sink(RelNode rel) {
- final Sink sink;
- if (rel instanceof TableScan) {
- sink = new EnumerableProxySink();
- } else {
- final ArrayDeque<Row> queue = new ArrayDeque<>(1);
- sink = new ListSink(queue);
- }
- NodeInfo nodeInfo = new NodeInfo(rel, sink);
+ final ArrayDeque<Row> queue = new ArrayDeque<>(1);
+ final Sink sink = new ListSink(queue);
+ NodeInfo nodeInfo = new NodeInfo(rel, sink, null);
nodes.put(rel, nodeInfo);
return sink;
}
+ /** Tells the interpreter that a given relational expression wishes to
+ * give its output as an enumerable.
+ *
+ * <p>This is as opposed to the norm, where a relational expression calls
+ * {@link #sink(RelNode)}, then its {@link Node#run()} method writes into that
+ * sink.
+ *
+ * @param rel Relational expression
+ * @param rowEnumerable Contents of relational expression
+ */
+ public void enumerable(RelNode rel, Enumerable<Row> rowEnumerable) {
+ NodeInfo nodeInfo = new NodeInfo(rel, null, rowEnumerable);
+ nodes.put(rel, nodeInfo);
+ }
+
public Context createContext() {
return new Context(dataContext);
}
@@ -306,37 +317,14 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
private static class NodeInfo {
final RelNode rel;
final Sink sink;
+ final Enumerable<Row> rowEnumerable;
Node node;
- public NodeInfo(RelNode rel, Sink sink) {
+ public NodeInfo(RelNode rel, Sink sink, Enumerable<Row> rowEnumerable) {
this.rel = rel;
this.sink = sink;
- }
- }
-
- /**
- * A sink that just proxies for an {@link org.apache.calcite.linq4j.Enumerable}. As such, its
- * not really a "sink" but instead just a thin layer for the {@link EnumerableProxySource} to
- * get an enumerator.
- *
- * <p>It can be little bit slower than the {@link Interpreter.ListSink} when trying to iterate
- * over the elements of the enumerable, unless the enumerable is backed by an in-memory cache
- * of the rows.
- */
- private static class EnumerableProxySink implements Sink {
- private Enumerable<Row> enumerable;
-
- @Override public void send(Row row) throws InterruptedException {
- throw new UnsupportedOperationException("Rows are only added through the "
- + "enumerable passed in through #setSourceEnumerable()!");
- }
-
- @Override public void end() throws InterruptedException {
- // noop
- }
-
- @Override public void setSourceEnumerable(Enumerable<Row> enumerable) {
- this.enumerable = enumerable;
+ this.rowEnumerable = rowEnumerable;
+ assert (sink != null) != (rowEnumerable != null) : "one or the other";
}
}
@@ -344,34 +332,24 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
* A {@link Source} that is just backed by an {@link Enumerator}. The {@link Enumerator} is closed
* when it is finished or by calling {@link #close()}.
*/
- private static class EnumerableProxySource implements Source {
+ private static class EnumeratorSource implements Source {
+ private final Enumerator<Row> enumerator;
- private Enumerator<Row> enumerator;
- private final EnumerableProxySink source;
-
- public EnumerableProxySource(EnumerableProxySink sink) {
- this.source = sink;
+ public EnumeratorSource(final Enumerator<Row> enumerator) {
+ this.enumerator = Preconditions.checkNotNull(enumerator);
}
@Override public Row receive() {
- if (enumerator == null) {
- enumerator = source.enumerable.enumerator();
- assert enumerator != null
- : "Sink did not set enumerable before source was asked for a row!";
- }
if (enumerator.moveNext()) {
return enumerator.current();
}
// close the enumerator once we have gone through everything
enumerator.close();
- this.enumerator = null;
return null;
}
@Override public void close() {
- if (this.enumerator != null) {
- this.enumerator.close();
- }
+ enumerator.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
index 2824b37..c88627b 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
@@ -59,19 +59,13 @@ import static org.apache.calcite.util.Static.RESOURCE;
* {@link org.apache.calcite.rel.core.TableScan}.
*/
public class TableScanNode implements Node {
- private final Sink sink;
- private final Enumerable<Row> enumerable;
-
private TableScanNode(Interpreter interpreter, TableScan rel,
Enumerable<Row> enumerable) {
- this.enumerable = enumerable;
- this.sink = interpreter.sink(rel);
+ interpreter.enumerable(rel, enumerable);
}
-
public void run() throws InterruptedException {
- sink.setSourceEnumerable(enumerable);
- sink.end();
+ // nothing to do
}
/** Creates a TableScanNode.
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
index 85f8bc6..da46850 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
@@ -102,7 +102,7 @@ public abstract class Linq4j {
* @return Iterator
*/
public static <T> Iterator<T> enumeratorIterator(Enumerator<T> enumerator) {
- return new EnumeratorIterator<T>(enumerator);
+ return new EnumeratorIterator<>(enumerator);
}
/**
@@ -120,7 +120,7 @@ public abstract class Linq4j {
(Enumerable) iterable;
return enumerable.enumerator();
}
- return new IterableEnumerator<T>(iterable);
+ return new IterableEnumerator<>(iterable);
}
/**
@@ -132,7 +132,7 @@ public abstract class Linq4j {
* @return enumerable
*/
public static <T> Enumerable<T> asEnumerable(final List<T> list) {
- return new ListEnumerable<T>(list);
+ return new ListEnumerable<>(list);
}
/**
@@ -151,7 +151,7 @@ public abstract class Linq4j {
//noinspection unchecked
return asEnumerable((List) collection);
}
- return new CollectionEnumerable<T>(collection);
+ return new CollectionEnumerable<>(collection);
}
/**
@@ -170,7 +170,7 @@ public abstract class Linq4j {
//noinspection unchecked
return asEnumerable((Collection) iterable);
}
- return new IterableEnumerable<T>(iterable);
+ return new IterableEnumerable<>(iterable);
}
/**
@@ -182,7 +182,7 @@ public abstract class Linq4j {
* @return enumerable
*/
public static <T> Enumerable<T> asEnumerable(final T[] ts) {
- return new ListEnumerable<T>(Arrays.asList(ts));
+ return new ListEnumerable<>(Arrays.asList(ts));
}
/**
@@ -202,12 +202,24 @@ public abstract class Linq4j {
}
private static <V> Enumerator<V> listEnumerator(List<? extends V> list) {
- return new ListEnumerator<V>(list);
+ return new ListEnumerator<>(list);
}
- public static <T, R> Enumerator<R> transform(final Enumerator<T> enumerator,
- final Function1<T, R> func) {
- return new TransformedEnumerator<>(enumerator, func);
+ /** Applies a function to each element of an Enumerator.
+ *
+ * @param enumerator Backing enumerator
+ * @param func Transform function
+ * @param <F> Backing element type
+ * @param <E> Element type
+ * @return Enumerator
+ */
+ public static <F, E> Enumerator<E> transform(Enumerator<F> enumerator,
+ final Function1<F, E> func) {
+ return new TransformedEnumerator<F, E>(enumerator) {
+ protected E transform(F from) {
+ return func.apply(from);
+ }
+ };
}
/**
@@ -225,7 +237,7 @@ public abstract class Linq4j {
* query operators to be invoked on collections
* (including {@link java.util.List} and {@link java.util.Set}) by supplying
* the necessary type information. For example, {@link ArrayList} does not
- * implement {@link Enumerable}<T>, but you can invoke
+ * implement {@link Enumerable}<F>, but you can invoke
*
* <blockquote><code>Linq4j.cast(list, Integer.class)</code></blockquote>
*
@@ -266,7 +278,7 @@ public abstract class Linq4j {
* query operators to be invoked on collections
* (including {@link java.util.List} and {@link java.util.Set}) by supplying
* the necessary type information. For example, {@link ArrayList} does not
- * implement {@link Enumerable}<T>, but you can invoke
+ * implement {@link Enumerable}<F>, but you can invoke
*
* <blockquote><code>Linq4j.ofType(list, Integer.class)</code></blockquote>
*
@@ -304,7 +316,7 @@ public abstract class Linq4j {
* @return Singleton enumerator
*/
public static <T> Enumerator<T> singletonEnumerator(T element) {
- return new SingletonEnumerator<T>(element);
+ return new SingletonEnumerator<>(element);
}
/**
@@ -315,7 +327,7 @@ public abstract class Linq4j {
* @return Singleton enumerator
*/
public static <T> Enumerator<T> singletonNullEnumerator() {
- return new SingletonNullEnumerator<T>();
+ return new SingletonNullEnumerator<>();
}
/**
@@ -353,7 +365,7 @@ public abstract class Linq4j {
*/
public static <E> Enumerable<E> concat(
final List<Enumerable<E>> enumerableList) {
- return new CompositeEnumerable<E>(enumerableList);
+ return new CompositeEnumerable<>(enumerableList);
}
/**
@@ -379,7 +391,7 @@ public abstract class Linq4j {
*/
public static <T> Enumerator<List<T>> product(
List<Enumerator<T>> enumerators) {
- return new CartesianProductEnumerator<T>(enumerators);
+ return new CartesianProductEnumerator<>(enumerators);
}
/** Returns the cartesian product of an iterable of iterables. */
@@ -392,7 +404,7 @@ public abstract class Linq4j {
enumerators.add(iterableEnumerator(iterable));
}
return enumeratorIterator(
- new CartesianProductEnumerator<T>(enumerators));
+ new CartesianProductEnumerator<>(enumerators));
}
};
}
@@ -599,7 +611,7 @@ public abstract class Linq4j {
@Override public Enumerator<T> enumerator() {
if (iterable instanceof RandomAccess) {
//noinspection unchecked
- return new ListEnumerator<T>((List) iterable);
+ return new ListEnumerator<>((List) iterable);
}
return super.enumerator();
}
@@ -613,7 +625,7 @@ public abstract class Linq4j {
if (count >= list.size()) {
return Linq4j.emptyEnumerable();
}
- return new ListEnumerable<T>(list.subList(count, list.size()));
+ return new ListEnumerable<>(list.subList(count, list.size()));
}
@Override public Enumerable<T> take(int count) {
@@ -621,7 +633,7 @@ public abstract class Linq4j {
if (count >= list.size()) {
return this;
}
- return new ListEnumerable<T>(list.subList(0, count));
+ return new ListEnumerable<>(list.subList(0, count));
}
@Override public T elementAt(int index) {
@@ -727,39 +739,6 @@ public abstract class Linq4j {
public void close() {
}
}
-
- /** Enumerator that applies a transform to each value from a backing
- * enumerator.
- *
- * @param <T> Element type of backing enumerator
- * @param <R> Element type
- */
- private static class TransformedEnumerator<T, R> implements Enumerator<R> {
- private final Enumerator<T> enumerator;
- private final Function1<T, R> func;
-
- public TransformedEnumerator(Enumerator<T> enumerator,
- Function1<T, R> func) {
- this.enumerator = enumerator;
- this.func = func;
- }
-
- public boolean moveNext() {
- return enumerator.moveNext();
- }
-
- public R current() {
- return func.apply(enumerator.current());
- }
-
- public void reset() {
- enumerator.reset();
- }
-
- public void close() {
- enumerator.close();
- }
- }
}
// End Linq4j.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/linq4j/src/main/java/org/apache/calcite/linq4j/TransformedEnumerator.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/TransformedEnumerator.java b/linq4j/src/main/java/org/apache/calcite/linq4j/TransformedEnumerator.java
new file mode 100644
index 0000000..839c338
--- /dev/null
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/TransformedEnumerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j;
+
+/** Enumerator that applies a transform to each value from a backing
+ * enumerator.
+ *
+ * @param <F> Element type of backing enumerator
+ * @param <E> Element type
+ */
+public abstract class TransformedEnumerator<F, E> implements Enumerator<E> {
+ protected final Enumerator<F> enumerator;
+
+ public TransformedEnumerator(Enumerator<F> enumerator) {
+ this.enumerator = enumerator;
+ }
+
+ protected abstract E transform(F from);
+
+ public boolean moveNext() {
+ return enumerator.moveNext();
+ }
+
+ public E current() {
+ return transform(enumerator.current());
+ }
+
+ public void reset() {
+ enumerator.reset();
+ }
+
+ public void close() {
+ enumerator.close();
+ }
+}
+
+// End TransformedEnumerator.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java
----------------------------------------------------------------------
diff --git a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java
index 338251b..dfecf68 100644
--- a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java
+++ b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java
@@ -855,6 +855,28 @@ public class Linq4jTest {
assertThat(enumerator.moveNext(), is(false));
}
+ @Test public void testTransformEnumerator() {
+ final List<String> strings = Arrays.asList("one", "two", "three");
+ final Function1<String, Integer> func = new Function1<String, Integer>() {
+ public Integer apply(String a0) {
+ return a0.length();
+ }
+ };
+ final Enumerator<Integer> enumerator =
+ Linq4j.transform(Linq4j.enumerator(strings), func);
+ assertThat(enumerator.moveNext(), is(true));
+ assertThat(enumerator.current(), is(3));
+ assertThat(enumerator.moveNext(), is(true));
+ assertThat(enumerator.current(), is(3));
+ assertThat(enumerator.moveNext(), is(true));
+ assertThat(enumerator.current(), is(5));
+ assertThat(enumerator.moveNext(), is(false));
+
+ final Enumerator<Integer> enumerator2 =
+ Linq4j.transform(Linq4j.<String>emptyEnumerator(), func);
+ assertThat(enumerator2.moveNext(), is(false));
+ }
+
@Test public void testCast() {
final List<Number> numbers = Arrays.asList((Number) 2, null, 3.14, 5);
final Enumerator<Integer> enumerator =