You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/02 02:09:46 UTC

[10/18] incubator-calcite git commit: [CALCITE-809] TableScan does not support large/infinite scans (Jesse Yates)

[CALCITE-809] TableScan does not support large/infinite scans (Jesse Yates)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/30d618d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/30d618d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/30d618d5

Branch: refs/heads/master
Commit: 30d618d5f6254b269f7a6740f78ba9b63ff79040
Parents: 5b02090
Author: Jesse Yates <je...@gmail.com>
Authored: Sun Aug 9 12:04:11 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Sep 1 16:17:15 2015 -0700

----------------------------------------------------------------------
 .../apache/calcite/interpreter/Interpreter.java | 151 +++++++++++++---
 .../org/apache/calcite/interpreter/Sink.java    |   4 +
 .../org/apache/calcite/interpreter/Source.java  |   2 +
 .../calcite/interpreter/TableScanNode.java      |   7 +-
 .../apache/calcite/test/ScannableTableTest.java |   8 +-
 .../org/apache/calcite/test/StreamTest.java     | 170 +++++++++++++------
 .../calcite/linq4j/DelegatingEnumerator.java    |  48 ++++++
 .../java/org/apache/calcite/linq4j/Linq4j.java  |  12 ++
 8 files changed, 318 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index deaf061..4c0886e 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -18,13 +18,18 @@ package org.apache.calcite.interpreter;
 
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.DelegatingEnumerator;
+import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.rules.CalcSplitRule;
 import org.apache.calcite.rel.rules.FilterTableScanRule;
 import org.apache.calcite.rel.rules.ProjectTableScanRule;
@@ -87,33 +92,31 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
 
   public Enumerator<Object[]> enumerator() {
     start();
-    final ArrayDeque<Row> queue = nodes.get(rootRel).sink.list;
-    return new Enumerator<Object[]>() {
-      Row row;
-
-      public Object[] current() {
-        return row.getValues();
-      }
-
-      public boolean moveNext() {
-        try {
-          row = queue.removeFirst();
-        } catch (NoSuchElementException e) {
-          return false;
-        }
-        return true;
-      }
-
-      public void reset() {
-        row = null;
-      }
+    Sink sink = nodes.get(rootRel).sink;
+    Enumerator<Row> rows;
+    if (sink instanceof EnumerableProxySink) {
+      rows = ((EnumerableProxySink) sink).enumerable.enumerator();
+    } else {
+      final ArrayDeque<Row> queue = ((ListSink) sink).list;
+      rows = Linq4j.asEnumerable(queue).enumerator();
+    }
 
+    return new DelegatingEnumerator<Object[]>(Linq4j.transform(rows, rowConverter)) {
+      @Override
       public void close() {
+        super.close();
         Interpreter.this.close();
       }
     };
   }
 
+  private Function1<Row, Object[]> rowConverter = new Function1<Row, Object[]>() {
+    @Override
+    public Object[] apply(Row row) {
+      return row.getValues();
+    }
+  };
+
   private void start() {
     // We rely on the nodes being ordered leaves first.
     for (Map.Entry<RelNode, NodeInfo> entry : nodes.entrySet()) {
@@ -261,7 +264,14 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
     if (x == null) {
       throw new AssertionError("should be registered: " + rel);
     }
-    return new ListSource(x.sink);
+    Sink sink = x.sink;
+    if (sink instanceof ListSink) {
+      return new ListSource((ListSink) x.sink);
+    } else if (sink instanceof EnumerableProxySink) {
+      return new EnumerableProxySource((EnumerableProxySink) sink);
+    }
+    throw new IllegalStateException(
+      "Got a sink " + sink + " to which there is no match source type!");
   }
 
   private RelNode getInput(RelNode rel, int ordinal) {
@@ -273,9 +283,14 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
   }
 
   public Sink sink(RelNode rel) {
-    final ArrayDeque<Row> queue = new ArrayDeque<Row>(1);
-    final ListSink sink = new ListSink(queue);
-    final NodeInfo nodeInfo = new NodeInfo(rel, sink);
+    final Sink sink;
+    if (rel instanceof TableScan) {
+      sink = new EnumerableProxySink();
+    } else {
+      final ArrayDeque<Row> queue = new ArrayDeque<Row>(1);
+      sink = new ListSink(queue);
+    }
+    NodeInfo nodeInfo = new NodeInfo(rel, sink);
     nodes.put(rel, nodeInfo);
     return sink;
   }
@@ -291,15 +306,83 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
   /** Information about a node registered in the data flow graph. */
   private static class NodeInfo {
     final RelNode rel;
-    final ListSink sink;
+    final Sink sink;
     Node node;
 
-    public NodeInfo(RelNode rel, ListSink sink) {
+    public NodeInfo(RelNode rel, Sink sink) {
       this.rel = rel;
       this.sink = sink;
     }
   }
 
+  /**
+   * A sink that just proxies for an {@link org.apache.calcite.linq4j.Enumerable}. As such, its
+   * not really a "sink" but instead just a thin layer for the {@link EnumerableProxySource} to
+   * get an enumerator.
+   * <p>
+   * It can be little bit slower than the {@link Interpreter.ListSink} when trying to iterate
+   * over the elements of the enumerable, unless the enumerable is backed by an in-memory cache
+   * of the rows.
+   * </p>
+   */
+  private static class EnumerableProxySink implements Sink {
+
+    private Enumerable<Row> enumerable;
+
+    @Override
+    public void send(Row row) throws InterruptedException {
+      throw new UnsupportedOperationException("Row are only added through the enumerable passed "
+                                              + "in through #setSourceEnumerable()!");
+    }
+
+    @Override
+    public void end() throws InterruptedException {
+      // noop
+    }
+
+    @Override
+    public void setSourceEnumerable(Enumerable<Row> enumerable) {
+      this.enumerable = enumerable;
+    }
+  }
+
+  /**
+   * A {@link Source} that is just backed by an {@link Enumerator}. The {@link Enumerator} is closed
+   * when it is finished or by calling {@link #close()}
+   */
+  private static class EnumerableProxySource implements Source {
+
+    private Enumerator<Row> enumerator;
+    private final EnumerableProxySink source;
+
+    public EnumerableProxySource(EnumerableProxySink sink) {
+      this.source = sink;
+    }
+
+    @Override
+    public Row receive() {
+      if (enumerator == null) {
+        enumerator = source.enumerable.enumerator();
+        assert enumerator != null : "Sink did not set enumerable before source was asked for "
+                                    + "a row!";
+      }
+      if (enumerator.moveNext()) {
+        return enumerator.current();
+      }
+      // close the enumerator once we have gone through everything
+      enumerator.close();
+      this.enumerator = null;
+      return null;
+    }
+
+    @Override
+    public void close() {
+      if (this.enumerator != null) {
+        this.enumerator.close();
+      }
+    }
+  }
+
   /** Implementation of {@link Sink} using a {@link java.util.ArrayDeque}. */
   private static class ListSink implements Sink {
     final ArrayDeque<Row> list;
@@ -314,6 +397,17 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
 
     public void end() throws InterruptedException {
     }
+
+    @Override
+    public void setSourceEnumerable(Enumerable<Row> enumerable) throws InterruptedException {
+      // just copy over the source into the local list
+      Enumerator<Row> enumerator = enumerable.enumerator();
+      Row row;
+      while (!enumerator.moveNext()) {
+        this.send(enumerator.current());
+      }
+      enumerator.close();
+    }
   }
 
   /** Implementation of {@link Source} using a {@link java.util.ArrayDeque}. */
@@ -331,6 +425,11 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
         return null;
       }
     }
+
+    @Override
+    public void close() {
+      // noop
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/main/java/org/apache/calcite/interpreter/Sink.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Sink.java b/core/src/main/java/org/apache/calcite/interpreter/Sink.java
index adcbac7..9e49ee5 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Sink.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Sink.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.interpreter;
 
+import org.apache.calcite.linq4j.Enumerable;
+
 /**
  * Sink to which to send rows.
  *
@@ -25,6 +27,8 @@ public interface Sink {
   void send(Row row) throws InterruptedException;
 
   void end() throws InterruptedException;
+
+  void setSourceEnumerable(Enumerable<Row> enumerable) throws InterruptedException;
 }
 
 // End Sink.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/main/java/org/apache/calcite/interpreter/Source.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Source.java b/core/src/main/java/org/apache/calcite/interpreter/Source.java
index f020343..75b6c1b 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Source.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Source.java
@@ -24,6 +24,8 @@ package org.apache.calcite.interpreter;
 public interface Source {
   /** Reads a row. Null means end of data. */
   Row receive();
+
+  void close();
 }
 
 // End Source.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
index 2b1865f..2824b37 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
@@ -18,7 +18,6 @@ package org.apache.calcite.interpreter;
 
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.linq4j.Queryable;
 import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.linq4j.function.Predicate1;
@@ -71,11 +70,7 @@ public class TableScanNode implements Node {
 
 
   public void run() throws InterruptedException {
-    final Enumerator<Row> enumerator = enumerable.enumerator();
-    while (enumerator.moveNext()) {
-      sink.send(enumerator.current());
-    }
-    enumerator.close();
+    sink.setSourceEnumerable(enumerable);
     sink.end();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java b/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java
index 377fe67..2c24295 100644
--- a/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java
+++ b/core/src/test/java/org/apache/calcite/test/ScannableTableTest.java
@@ -205,6 +205,7 @@ public class ScannableTableTest {
     assertThat(CalciteAssert.toString(resultSet),
         equalTo("i=4; k=1942\n"
             + "i=6; k=1943\n"));
+    resultSet.close();
     assertThat(buf.toString(),
         equalTo("returnCount=4, projects=[0, 2]"));
     buf.setLength(0);
@@ -274,9 +275,12 @@ public class ScannableTableTest {
     final Statement statement = connection.createStatement();
     ResultSet resultSet = statement.executeQuery(
         "select \"k\" from \"s\".\"beatles2\" where \"k\" > 1941");
-    assertThat(buf.toString(),
-        equalTo("returnCount=4, projects=[2]"));
     assertThat(CalciteAssert.toString(resultSet), equalTo("k=1942\nk=1943\n"));
+    // have to iterate (CalciteAssert.toString) and then close the result set b/c it is backed by
+    // an enumerable that only populates the info buffer (buf) on close
+    resultSet.close();
+    assertThat(buf.toString(),
+      equalTo("returnCount=4, projects=[2]"));
   }
 
   /** Table that returns one column via the {@link ScannableTable} interface. */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index d4f4117..b8eacdd 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -20,6 +20,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function0;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -38,6 +39,7 @@ import org.apache.calcite.util.ImmutableBitSet;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 import org.junit.Ignore;
 import org.junit.Test;
@@ -45,6 +47,7 @@ import org.junit.Test;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.Iterator;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -54,23 +57,29 @@ import static org.junit.Assert.assertThat;
  * Tests for streaming queries.
  */
 public class StreamTest {
-  public static final String STREAM_SCHEMA = "     {\n"
-      + "       name: 'STREAMS',\n"
+  public static final String STREAM_SCHEMA_NAME = "STREAMS";
+  public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
+
+  private static String schemaFor(String name, Class<? extends TableFactory> clazz) {
+    return "     {\n"
+      + "       name: '" + name + "',\n"
       + "       tables: [ {\n"
       + "         type: 'custom',\n"
       + "         name: 'ORDERS',\n"
       + "         stream: {\n"
       + "           stream: true\n"
       + "         },\n"
-      + "         factory: '" + OrdersStreamTableFactory.class.getName() + "'\n"
+      + "         factory: '" + clazz.getName() + "'\n"
       + "       } ]\n"
       + "     }\n";
+  }
 
   public static final String STREAM_MODEL = "{\n"
       + "  version: '1.0',\n"
       + "  defaultSchema: 'foodmart',\n"
       + "   schemas: [\n"
-      + STREAM_SCHEMA
+      + schemaFor(STREAM_SCHEMA_NAME, OrdersStreamTableFactory.class) + ",\n"
+      + schemaFor(INFINITE_STREAM_SCHEMA_NAME, InfiniteOrdersStreamTableFactory.class)
       + "   ]\n"
       + "}";
 
@@ -80,7 +89,7 @@ public class StreamTest {
         .query("select stream * from orders")
         .convertContains("LogicalDelta\n"
             + "  LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3])\n"
-            + "    EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+            + "    LogicalTableScan(table=[[STREAMS, ORDERS]])\n")
         .explainContains("EnumerableInterpreter\n"
             + "  BindableTableScan(table=[[]])")
         .returns(
@@ -97,7 +106,7 @@ public class StreamTest {
             "LogicalDelta\n"
                 + "  LogicalProject(PRODUCT=[$2])\n"
                 + "    LogicalFilter(condition=[>($3, 6)])\n"
-                + "      EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+                + "      LogicalTableScan(table=[[STREAMS, ORDERS]])\n")
         .explainContains(
             "EnumerableCalc(expr#0..3=[{inputs}], expr#4=[6], expr#5=[>($t3, $t4)], PRODUCT=[$t2], $condition=[$t5])\n"
                 + "  EnumerableInterpreter\n"
@@ -120,7 +129,7 @@ public class StreamTest {
                 + "  LogicalFilter(condition=[>($2, 1)])\n"
                 + "    LogicalAggregate(group=[{0, 1}], C=[COUNT()])\n"
                 + "      LogicalProject(ROWTIME=[FLOOR($0, FLAG(HOUR))], PRODUCT=[$2])\n"
-                + "        EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+                + "        LogicalTableScan(table=[[STREAMS, ORDERS]])\n")
         .explainContains(
             "EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])\n"
                 + "  EnumerableAggregate(group=[{0, 1}], C=[COUNT()])\n"
@@ -142,7 +151,7 @@ public class StreamTest {
             "LogicalDelta\n"
                 + "  LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
                 + "    LogicalProject(ROWTIME=[FLOOR($0, FLAG(HOUR))], PRODUCT=[$2], UNITS=[$3])\n"
-                + "      EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+                + "      LogicalTableScan(table=[[STREAMS, ORDERS]])\n")
         .explainContains(
             "EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
                 + "  EnumerableCalc(expr#0..3=[{inputs}], expr#4=[FLAG(HOUR)], expr#5=[FLOOR($t0, $t4)], ROWTIME=[$t5], PRODUCT=[$t2], UNITS=[$t3])\n"
@@ -186,6 +195,19 @@ public class StreamTest {
                 "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=3"));
   }
 
+  /**
+   * Regression test for CALCITE-809
+   */
+  @Test public void testInfiniteStreamsDoNotBufferInMemory() {
+    CalciteAssert.model(STREAM_MODEL)
+                 .withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME)
+                 .query("select stream * from orders")
+                 .limit(100)
+                 .explainContains("EnumerableInterpreter\n"
+                                  + "  BindableTableScan(table=[[]])")
+                 .returnsCount(100);
+  }
+
   private Function<ResultSet, Void> startsWith(String... rows) {
     final ImmutableList<String> rowList = ImmutableList.copyOf(rows);
     return new Function<ResultSet, Void>() {
@@ -210,6 +232,38 @@ public class StreamTest {
     };
   }
 
+  /**
+   * Base table for the Orders table. Manages the base schema used for the test tables and common
+   * functions.
+   */
+  private abstract static class BaseOrderStreamTable implements ScannableTable, StreamableTable {
+    protected final RelProtoDataType protoRowType = new RelProtoDataType() {
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder()
+                 .add("ROWTIME", SqlTypeName.TIMESTAMP)
+                 .add("ID", SqlTypeName.INTEGER)
+                 .add("PRODUCT", SqlTypeName.VARCHAR, 10)
+                 .add("UNITS", SqlTypeName.INTEGER)
+                 .build();
+      }
+    };
+
+
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return protoRowType.apply(typeFactory);
+    }
+
+    public Statistic getStatistic() {
+      return Statistics.of(100d,
+        ImmutableList.<ImmutableBitSet>of(),
+        RelCollations.createSingleton(0));
+    }
+
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+  }
+
   /** Mock table that returns a stream of orders from a fixed array. */
   @SuppressWarnings("UnusedDeclaration")
   public static class OrdersStreamTableFactory implements TableFactory<Table> {
@@ -219,16 +273,6 @@ public class StreamTest {
 
     public Table create(SchemaPlus schema, String name,
         Map<String, Object> operand, RelDataType rowType) {
-      final RelProtoDataType protoRowType = new RelProtoDataType() {
-        public RelDataType apply(RelDataTypeFactory a0) {
-          return a0.builder()
-              .add("ROWTIME", SqlTypeName.TIMESTAMP)
-              .add("ID", SqlTypeName.INTEGER)
-              .add("PRODUCT", SqlTypeName.VARCHAR, 10)
-              .add("UNITS", SqlTypeName.INTEGER)
-              .build();
-        }
-      };
       final ImmutableList<Object[]> rows = ImmutableList.of(
           new Object[] {ts(10, 15, 0), 1, "paint", 10},
           new Object[] {ts(10, 24, 15), 2, "paper", 5},
@@ -236,25 +280,7 @@ public class StreamTest {
           new Object[] {ts(10, 58, 0), 4, "paint", 3},
           new Object[] {ts(11, 10, 0), 5, "paint", 3});
 
-      return new StreamableTable() {
-        public Table stream() {
-          return new OrdersTable(protoRowType, rows);
-        }
-
-        public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-          return protoRowType.apply(typeFactory);
-        }
-
-        public Statistic getStatistic() {
-          return Statistics.of(100d,
-              ImmutableList.<ImmutableBitSet>of(),
-              RelCollations.createSingleton(0));
-        }
-
-        public Schema.TableType getJdbcTableType() {
-          return Schema.TableType.TABLE;
-        }
-      };
+      return new OrdersTable(rows);
     }
 
     private Object ts(int h, int m, int s) {
@@ -263,13 +289,10 @@ public class StreamTest {
   }
 
   /** Table representing the ORDERS stream. */
-  public static class OrdersTable implements ScannableTable {
-    private final RelProtoDataType protoRowType;
+  public static class OrdersTable extends BaseOrderStreamTable {
     private final ImmutableList<Object[]> rows;
 
-    public OrdersTable(RelProtoDataType protoRowType,
-        ImmutableList<Object[]> rows) {
-      this.protoRowType = protoRowType;
+    public OrdersTable(ImmutableList<Object[]> rows) {
       this.rows = rows;
     }
 
@@ -277,18 +300,65 @@ public class StreamTest {
       return Linq4j.asEnumerable(rows);
     }
 
-    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-      return protoRowType.apply(typeFactory);
+    @Override
+    public Table stream() {
+      return new OrdersTable(rows);
     }
+  }
 
-    public Statistic getStatistic() {
-      return Statistics.of(100d,
-          ImmutableList.<ImmutableBitSet>of(),
-          RelCollations.createSingleton(0));
+  /**
+   * Mock table that returns a stream of orders from a fixed array.
+   */
+  @SuppressWarnings("UnusedDeclaration")
+  public static class InfiniteOrdersStreamTableFactory implements TableFactory<Table> {
+    // public constructor, per factory contract
+    public InfiniteOrdersStreamTableFactory() {
     }
 
-    public Schema.TableType getJdbcTableType() {
-      return Schema.TableType.STREAM;
+    public Table create(SchemaPlus schema, String name,
+      Map<String, Object> operand, RelDataType rowType) {
+      return new InfiniteOrdersTable();
+    }
+  }
+
+  public static final Function0<Object[]> ROW_GENERATOR = new Function0() {
+    private int counter = 0;
+    private Iterator<String> items = Iterables.cycle("paint", "paper", "brush").iterator();
+
+    @Override
+    public Object[] apply() {
+      return new Object[]{System.currentTimeMillis(), counter++, items.next(), 10};
+    }
+  };
+
+
+  /**
+   * Table representing an infinitely larger ORDERS stream.
+   */
+  public static class InfiniteOrdersTable extends BaseOrderStreamTable {
+
+    public Enumerable<Object[]> scan(DataContext root) {
+      return Linq4j.asEnumerable(new Iterable<Object[]>() {
+        @Override
+        public Iterator<Object[]> iterator() {
+          return new Iterator<Object[]>() {
+            @Override
+            public boolean hasNext() {
+              return true;
+            }
+
+            @Override
+            public Object[] next() {
+              return ROW_GENERATOR.apply();
+            }
+          };
+        }
+      });
+    }
+
+    @Override
+    public Table stream() {
+      return this;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java b/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
new file mode 100644
index 0000000..043f094
--- /dev/null
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j;
+
+/**
+ * Simple enumerator that just delegates all calls to the passed enumerator
+ * @param <T> type of value to return, as passed from the delegate enumerator
+ */
+public class DelegatingEnumerator<T> implements Enumerator<T> {
+
+  private Enumerator<T> delegate;
+
+  public DelegatingEnumerator(Enumerator<T> delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override public T current() {
+    return delegate.current();
+  }
+
+  @Override public boolean moveNext() {
+    return delegate.moveNext();
+  }
+
+  @Override public void reset() {
+    delegate.reset();
+  }
+
+  @Override public void close() {
+    delegate.close();
+  }
+}
+
+// End DelegatingEnumerator.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/30d618d5/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
index db897d9..eb50cee 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.linq4j;
 
+import org.apache.calcite.linq4j.function.Function1;
+
 import com.google.common.collect.Lists;
 
 import java.io.Closeable;
@@ -203,6 +205,16 @@ public abstract class Linq4j {
     return new ListEnumerator<V>(list);
   }
 
+  public static <T, R> Enumerator<R> transform(final Enumerator<T> enumerator,
+    final Function1<T, R> func) {
+    return new DelegatingEnumerator<R>((Enumerator<R>) enumerator) {
+      @Override
+      public R current() {
+        return func.apply((T) super.current());
+      }
+    };
+  }
+
   /**
    * Converts the elements of a given Iterable to the specified type.
    *