You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/02 02:09:46 UTC
[10/18] incubator-calcite git commit: [CALCITE-809] TableScan does
not support large/infinite scans (Jesse Yates)
[CALCITE-809] TableScan does not support large/infinite scans (Jesse Yates)
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/30d618d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/30d618d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/30d618d5
Branch: refs/heads/master
Commit: 30d618d5f6254b269f7a6740f78ba9b63ff79040
Parents: 5b02090
Author: Jesse Yates <je...@gmail.com>
Authored: Sun Aug 9 12:04:11 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Sep 1 16:17:15 2015 -0700
----------------------------------------------------------------------
.../apache/calcite/interpreter/Interpreter.java | 151 +++++++++++++---
.../org/apache/calcite/interpreter/Sink.java | 4 +
.../org/apache/calcite/interpreter/Source.java | 2 +
.../calcite/interpreter/TableScanNode.java | 7 +-
.../apache/calcite/test/ScannableTableTest.java | 8 +-
.../org/apache/calcite/test/StreamTest.java | 170 +++++++++++++------
.../calcite/linq4j/DelegatingEnumerator.java | 48 ++++++
.../java/org/apache/calcite/linq4j/Linq4j.java | 12 ++
8 files changed, 318 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index deaf061..4c0886e 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -18,13 +18,18 @@ package org.apache.calcite.interpreter;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.DelegatingEnumerator;
+import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.rules.CalcSplitRule;
import org.apache.calcite.rel.rules.FilterTableScanRule;
import org.apache.calcite.rel.rules.ProjectTableScanRule;
@@ -87,33 +92,31 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
public Enumerator<Object[]> enumerator() {
start();
- final ArrayDeque<Row> queue = nodes.get(rootRel).sink.list;
- return new Enumerator<Object[]>() {
- Row row;
-
- public Object[] current() {
- return row.getValues();
- }
-
- public boolean moveNext() {
- try {
- row = queue.removeFirst();
- } catch (NoSuchElementException e) {
- return false;
- }
- return true;
- }
-
- public void reset() {
- row = null;
- }
+ Sink sink = nodes.get(rootRel).sink;
+ Enumerator<Row> rows;
+ if (sink instanceof EnumerableProxySink) {
+ rows = ((EnumerableProxySink) sink).enumerable.enumerator();
+ } else {
+ final ArrayDeque<Row> queue = ((ListSink) sink).list;
+ rows = Linq4j.asEnumerable(queue).enumerator();
+ }
+ return new DelegatingEnumerator<Object[]>(Linq4j.transform(rows, rowConverter)) {
+ @Override
public void close() {
+ super.close();
Interpreter.this.close();
}
};
}
+ private Function1<Row, Object[]> rowConverter = new Function1<Row, Object[]>() {
+ @Override
+ public Object[] apply(Row row) {
+ return row.getValues();
+ }
+ };
+
private void start() {
// We rely on the nodes being ordered leaves first.
for (Map.Entry<RelNode, NodeInfo> entry : nodes.entrySet()) {
@@ -261,7 +264,14 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
if (x == null) {
throw new AssertionError("should be registered: " + rel);
}
- return new ListSource(x.sink);
+ Sink sink = x.sink;
+ if (sink instanceof ListSink) {
+ return new ListSource((ListSink) x.sink);
+ } else if (sink instanceof EnumerableProxySink) {
+ return new EnumerableProxySource((EnumerableProxySink) sink);
+ }
+ throw new IllegalStateException(
+ "Got a sink " + sink + " to which there is no match source type!");
}
private RelNode getInput(RelNode rel, int ordinal) {
@@ -273,9 +283,14 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
}
public Sink sink(RelNode rel) {
- final ArrayDeque<Row> queue = new ArrayDeque<Row>(1);
- final ListSink sink = new ListSink(queue);
- final NodeInfo nodeInfo = new NodeInfo(rel, sink);
+ final Sink sink;
+ if (rel instanceof TableScan) {
+ sink = new EnumerableProxySink();
+ } else {
+ final ArrayDeque<Row> queue = new ArrayDeque<Row>(1);
+ sink = new ListSink(queue);
+ }
+ NodeInfo nodeInfo = new NodeInfo(rel, sink);
nodes.put(rel, nodeInfo);
return sink;
}
@@ -291,15 +306,83 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
/** Information about a node registered in the data flow graph. */
private static class NodeInfo {
final RelNode rel;
- final ListSink sink;
+ final Sink sink;
Node node;
- public NodeInfo(RelNode rel, ListSink sink) {
+ public NodeInfo(RelNode rel, Sink sink) {
this.rel = rel;
this.sink = sink;
}
}
+ /**
+ * A sink that just proxies for an {@link org.apache.calcite.linq4j.Enumerable}. As such, its
+ * not really a "sink" but instead just a thin layer for the {@link EnumerableProxySource} to
+ * get an enumerator.
+ * <p>
+ * It can be little bit slower than the {@link Interpreter.ListSink} when trying to iterate
+ * over the elements of the enumerable, unless the enumerable is backed by an in-memory cache
+ * of the rows.
+ * </p>
+ */
+ private static class EnumerableProxySink implements Sink {
+
+ private Enumerable<Row> enumerable;
+
+ @Override
+ public void send(Row row) throws InterruptedException {
+ throw new UnsupportedOperationException("Row are only added through the enumerable passed "
+ + "in through #setSourceEnumerable()!");
+ }
+
+ @Override
+ public void end() throws InterruptedException {
+ // noop
+ }
+
+ @Override
+ public void setSourceEnumerable(Enumerable<Row> enumerable) {
+ this.enumerable = enumerable;
+ }
+ }
+
+ /**
+ * A {@link Source} that is just backed by an {@link Enumerator}. The {@link Enumerator} is closed
+ * when it is finished or by calling {@link #close()}
+ */
+ private static class EnumerableProxySource implements Source {
+
+ private Enumerator<Row> enumerator;
+ private final EnumerableProxySink source;
+
+ public EnumerableProxySource(EnumerableProxySink sink) {
+ this.source = sink;
+ }
+
+ @Override
+ public Row receive() {
+ if (enumerator == null) {
+ enumerator = source.enumerable.enumerator();
+ assert enumerator != null : "Sink did not set enumerable before source was asked for "
+ + "a row!";
+ }
+ if (enumerator.moveNext()) {
+ return enumerator.current();
+ }
+ // close the enumerator once we have gone through everything
+ enumerator.close();
+ this.enumerator = null;
+ return null;
+ }
+
+ @Override
+ public void close() {
+ if (this.enumerator != null) {
+ this.enumerator.close();
+ }
+ }
+ }
+
/** Implementation of {@link Sink} using a {@link java.util.ArrayDeque}. */
private static class ListSink implements Sink {
final ArrayDeque<Row> list;
@@ -314,6 +397,17 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
public void end() throws InterruptedException {
}
+
+ @Override
+ public void setSourceEnumerable(Enumerable<Row> enumerable) throws InterruptedException {
+ // just copy over the source into the local list
+ Enumerator<Row> enumerator = enumerable.enumerator();
+ Row row;
+ while (!enumerator.moveNext()) {
+ this.send(enumerator.current());
+ }
+ enumerator.close();
+ }
}
/** Implementation of {@link Source} using a {@link java.util.ArrayDeque}. */
@@ -331,6 +425,11 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
return null;
}
}
+
+ @Override
+ public void close() {
+ // noop
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/main/java/org/apache/calcite/interpreter/Sink.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Sink.java b/core/src/main/java/org/apache/calcite/interpreter/Sink.java
index adcbac7..9e49ee5 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Sink.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Sink.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.interpreter;
+import org.apache.calcite.linq4j.Enumerable;
+
/**
* Sink to which to send rows.
*
@@ -25,6 +27,8 @@ public interface Sink {
void send(Row row) throws InterruptedException;
void end() throws InterruptedException;
+
+ void setSourceEnumerable(Enumerable<Row> enumerable) throws InterruptedException;
}
// End Sink.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/main/java/org/apache/calcite/interpreter/Source.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Source.java b/core/src/main/java/org/apache/calcite/interpreter/Source.java
index f020343..75b6c1b 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Source.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Source.java
@@ -24,6 +24,8 @@ package org.apache.calcite.interpreter;
public interface Source {
/** Reads a row. Null means end of data. */
Row receive();
+
+ void close();
}
// End Source.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
index 2b1865f..2824b37 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
@@ -18,7 +18,6 @@ package org.apache.calcite.interpreter;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.linq4j.function.Predicate1;
@@ -71,11 +70,7 @@ public class TableScanNode implements Node {
public void run() throws InterruptedException {
- final Enumerator<Row> enumerator = enumerable.enumerator();
- while (enumerator.moveNext()) {
- sink.send(enumerator.current());
- }
- enumerator.close();
+ sink.setSourceEnumerable(enumerable);
sink.end();
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java b/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java
index 377fe67..2c24295 100644
--- a/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java
+++ b/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java
@@ -205,6 +205,7 @@ public class ScannableTableTest {
assertThat(CalciteAssert.toString(resultSet),
equalTo("i=4; k=1942\n"
+ "i=6; k=1943\n"));
+ resultSet.close();
assertThat(buf.toString(),
equalTo("returnCount=4, projects=[0, 2]"));
buf.setLength(0);
@@ -274,9 +275,12 @@ public class ScannableTableTest {
final Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(
"select \"k\" from \"s\".\"beatles2\" where \"k\" > 1941");
- assertThat(buf.toString(),
- equalTo("returnCount=4, projects=[2]"));
assertThat(CalciteAssert.toString(resultSet), equalTo("k=1942\nk=1943\n"));
+ // have to iterate (CalciteAssert.toString) and then close the result set b/c it is backed by
+ // an enumerable that only populates the info buffer (buf) on close
+ resultSet.close();
+ assertThat(buf.toString(),
+ equalTo("returnCount=4, projects=[2]"));
}
/** Table that returns one column via the {@link ScannableTable} interface. */
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index d4f4117..b8eacdd 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -20,6 +20,7 @@ import org.apache.calcite.DataContext;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function0;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -38,6 +39,7 @@ import org.apache.calcite.util.ImmutableBitSet;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import org.junit.Ignore;
import org.junit.Test;
@@ -45,6 +47,7 @@ import org.junit.Test;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.util.Iterator;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -54,23 +57,29 @@ import static org.junit.Assert.assertThat;
* Tests for streaming queries.
*/
public class StreamTest {
- public static final String STREAM_SCHEMA = " {\n"
- + " name: 'STREAMS',\n"
+ public static final String STREAM_SCHEMA_NAME = "STREAMS";
+ public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
+
+ private static String schemaFor(String name, Class<? extends TableFactory> clazz) {
+ return " {\n"
+ + " name: '" + name + "',\n"
+ " tables: [ {\n"
+ " type: 'custom',\n"
+ " name: 'ORDERS',\n"
+ " stream: {\n"
+ " stream: true\n"
+ " },\n"
- + " factory: '" + OrdersStreamTableFactory.class.getName() + "'\n"
+ + " factory: '" + clazz.getName() + "'\n"
+ " } ]\n"
+ " }\n";
+ }
public static final String STREAM_MODEL = "{\n"
+ " version: '1.0',\n"
+ " defaultSchema: 'foodmart',\n"
+ " schemas: [\n"
- + STREAM_SCHEMA
+ + schemaFor(STREAM_SCHEMA_NAME, OrdersStreamTableFactory.class) + ",\n"
+ + schemaFor(INFINITE_STREAM_SCHEMA_NAME, InfiniteOrdersStreamTableFactory.class)
+ " ]\n"
+ "}";
@@ -80,7 +89,7 @@ public class StreamTest {
.query("select stream * from orders")
.convertContains("LogicalDelta\n"
+ " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3])\n"
- + " EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+ + " LogicalTableScan(table=[[STREAMS, ORDERS]])\n")
.explainContains("EnumerableInterpreter\n"
+ " BindableTableScan(table=[[]])")
.returns(
@@ -97,7 +106,7 @@ public class StreamTest {
"LogicalDelta\n"
+ " LogicalProject(PRODUCT=[$2])\n"
+ " LogicalFilter(condition=[>($3, 6)])\n"
- + " EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+ + " LogicalTableScan(table=[[STREAMS, ORDERS]])\n")
.explainContains(
"EnumerableCalc(expr#0..3=[{inputs}], expr#4=[6], expr#5=[>($t3, $t4)], PRODUCT=[$t2], $condition=[$t5])\n"
+ " EnumerableInterpreter\n"
@@ -120,7 +129,7 @@ public class StreamTest {
+ " LogicalFilter(condition=[>($2, 1)])\n"
+ " LogicalAggregate(group=[{0, 1}], C=[COUNT()])\n"
+ " LogicalProject(ROWTIME=[FLOOR($0, FLAG(HOUR))], PRODUCT=[$2])\n"
- + " EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+ + " LogicalTableScan(table=[[STREAMS, ORDERS]])\n")
.explainContains(
"EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])\n"
+ " EnumerableAggregate(group=[{0, 1}], C=[COUNT()])\n"
@@ -142,7 +151,7 @@ public class StreamTest {
"LogicalDelta\n"
+ " LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+ " LogicalProject(ROWTIME=[FLOOR($0, FLAG(HOUR))], PRODUCT=[$2], UNITS=[$3])\n"
- + " EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+ + " LogicalTableScan(table=[[STREAMS, ORDERS]])\n")
.explainContains(
"EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+ " EnumerableCalc(expr#0..3=[{inputs}], expr#4=[FLAG(HOUR)], expr#5=[FLOOR($t0, $t4)], ROWTIME=[$t5], PRODUCT=[$t2], UNITS=[$t3])\n"
@@ -186,6 +195,19 @@ public class StreamTest {
"ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=3"));
}
+ /**
+ * Regression test for CALCITE-809
+ */
+ @Test public void testInfiniteStreamsDoNotBufferInMemory() {
+ CalciteAssert.model(STREAM_MODEL)
+ .withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME)
+ .query("select stream * from orders")
+ .limit(100)
+ .explainContains("EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[]])")
+ .returnsCount(100);
+ }
+
private Function<ResultSet, Void> startsWith(String... rows) {
final ImmutableList<String> rowList = ImmutableList.copyOf(rows);
return new Function<ResultSet, Void>() {
@@ -210,6 +232,38 @@ public class StreamTest {
};
}
+ /**
+ * Base table for the Orders table. Manages the base schema used for the test tables and common
+ * functions.
+ */
+ private abstract static class BaseOrderStreamTable implements ScannableTable, StreamableTable {
+ protected final RelProtoDataType protoRowType = new RelProtoDataType() {
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder()
+ .add("ROWTIME", SqlTypeName.TIMESTAMP)
+ .add("ID", SqlTypeName.INTEGER)
+ .add("PRODUCT", SqlTypeName.VARCHAR, 10)
+ .add("UNITS", SqlTypeName.INTEGER)
+ .build();
+ }
+ };
+
+
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return protoRowType.apply(typeFactory);
+ }
+
+ public Statistic getStatistic() {
+ return Statistics.of(100d,
+ ImmutableList.<ImmutableBitSet>of(),
+ RelCollations.createSingleton(0));
+ }
+
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+ }
+
/** Mock table that returns a stream of orders from a fixed array. */
@SuppressWarnings("UnusedDeclaration")
public static class OrdersStreamTableFactory implements TableFactory<Table> {
@@ -219,16 +273,6 @@ public class StreamTest {
public Table create(SchemaPlus schema, String name,
Map<String, Object> operand, RelDataType rowType) {
- final RelProtoDataType protoRowType = new RelProtoDataType() {
- public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder()
- .add("ROWTIME", SqlTypeName.TIMESTAMP)
- .add("ID", SqlTypeName.INTEGER)
- .add("PRODUCT", SqlTypeName.VARCHAR, 10)
- .add("UNITS", SqlTypeName.INTEGER)
- .build();
- }
- };
final ImmutableList<Object[]> rows = ImmutableList.of(
new Object[] {ts(10, 15, 0), 1, "paint", 10},
new Object[] {ts(10, 24, 15), 2, "paper", 5},
@@ -236,25 +280,7 @@ public class StreamTest {
new Object[] {ts(10, 58, 0), 4, "paint", 3},
new Object[] {ts(11, 10, 0), 5, "paint", 3});
- return new StreamableTable() {
- public Table stream() {
- return new OrdersTable(protoRowType, rows);
- }
-
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return protoRowType.apply(typeFactory);
- }
-
- public Statistic getStatistic() {
- return Statistics.of(100d,
- ImmutableList.<ImmutableBitSet>of(),
- RelCollations.createSingleton(0));
- }
-
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.TABLE;
- }
- };
+ return new OrdersTable(rows);
}
private Object ts(int h, int m, int s) {
@@ -263,13 +289,10 @@ public class StreamTest {
}
/** Table representing the ORDERS stream. */
- public static class OrdersTable implements ScannableTable {
- private final RelProtoDataType protoRowType;
+ public static class OrdersTable extends BaseOrderStreamTable {
private final ImmutableList<Object[]> rows;
- public OrdersTable(RelProtoDataType protoRowType,
- ImmutableList<Object[]> rows) {
- this.protoRowType = protoRowType;
+ public OrdersTable(ImmutableList<Object[]> rows) {
this.rows = rows;
}
@@ -277,18 +300,65 @@ public class StreamTest {
return Linq4j.asEnumerable(rows);
}
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return protoRowType.apply(typeFactory);
+ @Override
+ public Table stream() {
+ return new OrdersTable(rows);
}
+ }
- public Statistic getStatistic() {
- return Statistics.of(100d,
- ImmutableList.<ImmutableBitSet>of(),
- RelCollations.createSingleton(0));
+ /**
+ * Mock table that returns a stream of orders from a fixed array.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public static class InfiniteOrdersStreamTableFactory implements TableFactory<Table> {
+ // public constructor, per factory contract
+ public InfiniteOrdersStreamTableFactory() {
}
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.STREAM;
+ public Table create(SchemaPlus schema, String name,
+ Map<String, Object> operand, RelDataType rowType) {
+ return new InfiniteOrdersTable();
+ }
+ }
+
+ public static final Function0<Object[]> ROW_GENERATOR = new Function0() {
+ private int counter = 0;
+ private Iterator<String> items = Iterables.cycle("paint", "paper", "brush").iterator();
+
+ @Override
+ public Object[] apply() {
+ return new Object[]{System.currentTimeMillis(), counter++, items.next(), 10};
+ }
+ };
+
+
+ /**
+ * Table representing an infinitely larger ORDERS stream.
+ */
+ public static class InfiniteOrdersTable extends BaseOrderStreamTable {
+
+ public Enumerable<Object[]> scan(DataContext root) {
+ return Linq4j.asEnumerable(new Iterable<Object[]>() {
+ @Override
+ public Iterator<Object[]> iterator() {
+ return new Iterator<Object[]>() {
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public Object[] next() {
+ return ROW_GENERATOR.apply();
+ }
+ };
+ }
+ });
+ }
+
+ @Override
+ public Table stream() {
+ return this;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java b/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
new file mode 100644
index 0000000..043f094
--- /dev/null
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j;
+
+/**
+ * Simple enumerator that just delegates all calls to the passed enumerator
+ * @param <T> type of value to return, as passed from the delegate enumerator
+ */
+public class DelegatingEnumerator<T> implements Enumerator<T> {
+
+ private Enumerator<T> delegate;
+
+ public DelegatingEnumerator(Enumerator<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override public T current() {
+ return delegate.current();
+ }
+
+ @Override public boolean moveNext() {
+ return delegate.moveNext();
+ }
+
+ @Override public void reset() {
+ delegate.reset();
+ }
+
+ @Override public void close() {
+ delegate.close();
+ }
+}
+
+// End DelegatingEnumerator.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
index db897d9..eb50cee 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.linq4j;
+import org.apache.calcite.linq4j.function.Function1;
+
import com.google.common.collect.Lists;
import java.io.Closeable;
@@ -203,6 +205,16 @@ public abstract class Linq4j {
return new ListEnumerator<V>(list);
}
+ public static <T, R> Enumerator<R> transform(final Enumerator<T> enumerator,
+ final Function1<T, R> func) {
+ return new DelegatingEnumerator<R>((Enumerator<R>) enumerator) {
+ @Override
+ public R current() {
+ return func.apply((T) super.current());
+ }
+ };
+ }
+
/**
* Converts the elements of a given Iterable to the specified type.
*