You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/02 02:09:42 UTC
[06/18] incubator-calcite git commit: Fix up [CALCITE-809],
deprecating Sink.setSourceEnumerable
Fix up [CALCITE-809], deprecating Sink.setSourceEnumerable
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/a3da6915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/a3da6915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/a3da6915
Branch: refs/heads/master
Commit: a3da69153b030552abf87f279487ebda4a5ae69c
Parents: 30d618d
Author: Julian Hyde <jh...@apache.org>
Authored: Mon Aug 10 15:11:24 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Sep 1 16:17:15 2015 -0700
----------------------------------------------------------------------
.../apache/calcite/interpreter/Interpreter.java | 60 +++++++----------
.../org/apache/calcite/interpreter/Sink.java | 2 +
.../org/apache/calcite/test/StreamTest.java | 69 ++++++++++----------
.../calcite/linq4j/DelegatingEnumerator.java | 6 +-
.../java/org/apache/calcite/linq4j/Linq4j.java | 42 ++++++++++--
5 files changed, 100 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index 4c0886e..e4c2cfe 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -102,20 +102,19 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
}
return new DelegatingEnumerator<Object[]>(Linq4j.transform(rows, rowConverter)) {
- @Override
- public void close() {
+ @Override public void close() {
super.close();
Interpreter.this.close();
}
};
}
- private Function1<Row, Object[]> rowConverter = new Function1<Row, Object[]>() {
- @Override
- public Object[] apply(Row row) {
- return row.getValues();
- }
- };
+ private Function1<Row, Object[]> rowConverter =
+ new Function1<Row, Object[]>() {
+ @Override public Object[] apply(Row row) {
+ return row.getValues();
+ }
+ };
private void start() {
// We rely on the nodes being ordered leaves first.
@@ -287,7 +286,7 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
if (rel instanceof TableScan) {
sink = new EnumerableProxySink();
} else {
- final ArrayDeque<Row> queue = new ArrayDeque<Row>(1);
+ final ArrayDeque<Row> queue = new ArrayDeque<>(1);
sink = new ListSink(queue);
}
NodeInfo nodeInfo = new NodeInfo(rel, sink);
@@ -319,36 +318,31 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
* A sink that just proxies for an {@link org.apache.calcite.linq4j.Enumerable}. As such, its
* not really a "sink" but instead just a thin layer for the {@link EnumerableProxySource} to
* get an enumerator.
- * <p>
- * It can be little bit slower than the {@link Interpreter.ListSink} when trying to iterate
+ *
+ * <p>It can be little bit slower than the {@link Interpreter.ListSink} when trying to iterate
* over the elements of the enumerable, unless the enumerable is backed by an in-memory cache
* of the rows.
- * </p>
*/
private static class EnumerableProxySink implements Sink {
-
private Enumerable<Row> enumerable;
- @Override
- public void send(Row row) throws InterruptedException {
- throw new UnsupportedOperationException("Row are only added through the enumerable passed "
- + "in through #setSourceEnumerable()!");
+ @Override public void send(Row row) throws InterruptedException {
+ throw new UnsupportedOperationException("Rows are only added through the "
+ + "enumerable passed in through #setSourceEnumerable()!");
}
- @Override
- public void end() throws InterruptedException {
+ @Override public void end() throws InterruptedException {
// noop
}
- @Override
- public void setSourceEnumerable(Enumerable<Row> enumerable) {
+ @Override public void setSourceEnumerable(Enumerable<Row> enumerable) {
this.enumerable = enumerable;
}
}
/**
* A {@link Source} that is just backed by an {@link Enumerator}. The {@link Enumerator} is closed
- * when it is finished or by calling {@link #close()}
+ * when it is finished or by calling {@link #close()}.
*/
private static class EnumerableProxySource implements Source {
@@ -359,12 +353,11 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
this.source = sink;
}
- @Override
- public Row receive() {
+ @Override public Row receive() {
if (enumerator == null) {
enumerator = source.enumerable.enumerator();
- assert enumerator != null : "Sink did not set enumerable before source was asked for "
- + "a row!";
+ assert enumerator != null
+ : "Sink did not set enumerable before source was asked for a row!";
}
if (enumerator.moveNext()) {
return enumerator.current();
@@ -375,8 +368,7 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
return null;
}
- @Override
- public void close() {
+ @Override public void close() {
if (this.enumerator != null) {
this.enumerator.close();
}
@@ -398,12 +390,11 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
public void end() throws InterruptedException {
}
- @Override
- public void setSourceEnumerable(Enumerable<Row> enumerable) throws InterruptedException {
+ @Override public void setSourceEnumerable(Enumerable<Row> enumerable)
+ throws InterruptedException {
// just copy over the source into the local list
- Enumerator<Row> enumerator = enumerable.enumerator();
- Row row;
- while (!enumerator.moveNext()) {
+ final Enumerator<Row> enumerator = enumerable.enumerator();
+ while (enumerator.moveNext()) {
this.send(enumerator.current());
}
enumerator.close();
@@ -426,8 +417,7 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
}
}
- @Override
- public void close() {
+ @Override public void close() {
// noop
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/core/src/main/java/org/apache/calcite/interpreter/Sink.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Sink.java b/core/src/main/java/org/apache/calcite/interpreter/Sink.java
index 9e49ee5..6056b96 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Sink.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Sink.java
@@ -28,6 +28,8 @@ public interface Sink {
void end() throws InterruptedException;
+ /** This method is temporary. It will be removed without notice. */
+ @Deprecated
void setSourceEnumerable(Enumerable<Row> enumerable) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index b8eacdd..5a6291b 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -71,15 +71,17 @@ public class StreamTest {
+ " },\n"
+ " factory: '" + clazz.getName() + "'\n"
+ " } ]\n"
- + " }\n";
+ + " }";
}
public static final String STREAM_MODEL = "{\n"
+ " version: '1.0',\n"
+ " defaultSchema: 'foodmart',\n"
+ " schemas: [\n"
- + schemaFor(STREAM_SCHEMA_NAME, OrdersStreamTableFactory.class) + ",\n"
+ + schemaFor(STREAM_SCHEMA_NAME, OrdersStreamTableFactory.class)
+ + ",\n"
+ schemaFor(INFINITE_STREAM_SCHEMA_NAME, InfiniteOrdersStreamTableFactory.class)
+ + "\n"
+ " ]\n"
+ "}";
@@ -196,16 +198,18 @@ public class StreamTest {
}
/**
- * Regression test for CALCITE-809
+ * Test case for
+ * <a href="https://issues.apache.org/jira/browse/CALCITE-809">[CALCITE-809]
+ * TableScan does not support large/infinite scans</a>.
*/
@Test public void testInfiniteStreamsDoNotBufferInMemory() {
CalciteAssert.model(STREAM_MODEL)
- .withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME)
- .query("select stream * from orders")
- .limit(100)
- .explainContains("EnumerableInterpreter\n"
- + " BindableTableScan(table=[[]])")
- .returnsCount(100);
+ .withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME)
+ .query("select stream * from orders")
+ .limit(100)
+ .explainContains("EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[]])")
+ .returnsCount(100);
}
private Function<ResultSet, Void> startsWith(String... rows) {
@@ -240,15 +244,14 @@ public class StreamTest {
protected final RelProtoDataType protoRowType = new RelProtoDataType() {
public RelDataType apply(RelDataTypeFactory a0) {
return a0.builder()
- .add("ROWTIME", SqlTypeName.TIMESTAMP)
- .add("ID", SqlTypeName.INTEGER)
- .add("PRODUCT", SqlTypeName.VARCHAR, 10)
- .add("UNITS", SqlTypeName.INTEGER)
- .build();
+ .add("ROWTIME", SqlTypeName.TIMESTAMP)
+ .add("ID", SqlTypeName.INTEGER)
+ .add("PRODUCT", SqlTypeName.VARCHAR, 10)
+ .add("UNITS", SqlTypeName.INTEGER)
+ .build();
}
};
-
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return protoRowType.apply(typeFactory);
}
@@ -300,8 +303,7 @@ public class StreamTest {
return Linq4j.asEnumerable(rows);
}
- @Override
- public Table stream() {
+ @Override public Table stream() {
return new OrdersTable(rows);
}
}
@@ -316,48 +318,47 @@ public class StreamTest {
}
public Table create(SchemaPlus schema, String name,
- Map<String, Object> operand, RelDataType rowType) {
+ Map<String, Object> operand, RelDataType rowType) {
return new InfiniteOrdersTable();
}
}
- public static final Function0<Object[]> ROW_GENERATOR = new Function0() {
- private int counter = 0;
- private Iterator<String> items = Iterables.cycle("paint", "paper", "brush").iterator();
-
- @Override
- public Object[] apply() {
- return new Object[]{System.currentTimeMillis(), counter++, items.next(), 10};
- }
- };
+ public static final Function0<Object[]> ROW_GENERATOR =
+ new Function0<Object[]>() {
+ private int counter = 0;
+ private Iterator<String> items =
+ Iterables.cycle("paint", "paper", "brush").iterator();
+ @Override public Object[] apply() {
+ return new Object[]{System.currentTimeMillis(), counter++, items.next(), 10};
+ }
+ };
/**
* Table representing an infinitely larger ORDERS stream.
*/
public static class InfiniteOrdersTable extends BaseOrderStreamTable {
-
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(new Iterable<Object[]>() {
- @Override
- public Iterator<Object[]> iterator() {
+ @Override public Iterator<Object[]> iterator() {
return new Iterator<Object[]>() {
- @Override
public boolean hasNext() {
return true;
}
- @Override
public Object[] next() {
return ROW_GENERATOR.apply();
}
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
};
}
});
}
- @Override
- public Table stream() {
+ @Override public Table stream() {
return this;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java b/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
index 043f094..c4d003b 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
@@ -17,12 +17,12 @@
package org.apache.calcite.linq4j;
/**
- * Simple enumerator that just delegates all calls to the passed enumerator
+ * Simple enumerator that just delegates all calls to the passed enumerator.
+ *
* @param <T> type of value to return, as passed from the delegate enumerator
*/
public class DelegatingEnumerator<T> implements Enumerator<T> {
-
- private Enumerator<T> delegate;
+ protected final Enumerator<T> delegate;
public DelegatingEnumerator(Enumerator<T> delegate) {
this.delegate = delegate;
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
index eb50cee..85f8bc6 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
@@ -206,13 +206,8 @@ public abstract class Linq4j {
}
public static <T, R> Enumerator<R> transform(final Enumerator<T> enumerator,
- final Function1<T, R> func) {
- return new DelegatingEnumerator<R>((Enumerator<R>) enumerator) {
- @Override
- public R current() {
- return func.apply((T) super.current());
- }
- };
+ final Function1<T, R> func) {
+ return new TransformedEnumerator<>(enumerator, func);
}
/**
@@ -732,6 +727,39 @@ public abstract class Linq4j {
public void close() {
}
}
+
+ /** Enumerator that applies a transform to each value from a backing
+ * enumerator.
+ *
+ * @param <T> Element type of backing enumerator
+ * @param <R> Element type
+ */
+ private static class TransformedEnumerator<T, R> implements Enumerator<R> {
+ private final Enumerator<T> enumerator;
+ private final Function1<T, R> func;
+
+ public TransformedEnumerator(Enumerator<T> enumerator,
+ Function1<T, R> func) {
+ this.enumerator = enumerator;
+ this.func = func;
+ }
+
+ public boolean moveNext() {
+ return enumerator.moveNext();
+ }
+
+ public R current() {
+ return func.apply(enumerator.current());
+ }
+
+ public void reset() {
+ enumerator.reset();
+ }
+
+ public void close() {
+ enumerator.close();
+ }
+ }
}
// End Linq4j.java