You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/02 02:09:42 UTC

[06/18] incubator-calcite git commit: Fix up [CALCITE-809], deprecating Sink.setSourceEnumerable

Fix up [CALCITE-809], deprecating Sink.setSourceEnumerable


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/a3da6915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/a3da6915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/a3da6915

Branch: refs/heads/master
Commit: a3da69153b030552abf87f279487ebda4a5ae69c
Parents: 30d618d
Author: Julian Hyde <jh...@apache.org>
Authored: Mon Aug 10 15:11:24 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Sep 1 16:17:15 2015 -0700

----------------------------------------------------------------------
 .../apache/calcite/interpreter/Interpreter.java | 60 +++++++----------
 .../org/apache/calcite/interpreter/Sink.java    |  2 +
 .../org/apache/calcite/test/StreamTest.java     | 69 ++++++++++----------
 .../calcite/linq4j/DelegatingEnumerator.java    |  6 +-
 .../java/org/apache/calcite/linq4j/Linq4j.java  | 42 ++++++++++--
 5 files changed, 100 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index 4c0886e..e4c2cfe 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -102,20 +102,19 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
     }
 
     return new DelegatingEnumerator<Object[]>(Linq4j.transform(rows, rowConverter)) {
-      @Override
-      public void close() {
+      @Override public void close() {
         super.close();
         Interpreter.this.close();
       }
     };
   }
 
-  private Function1<Row, Object[]> rowConverter = new Function1<Row, Object[]>() {
-    @Override
-    public Object[] apply(Row row) {
-      return row.getValues();
-    }
-  };
+  private Function1<Row, Object[]> rowConverter =
+    new Function1<Row, Object[]>() {
+      @Override public Object[] apply(Row row) {
+        return row.getValues();
+      }
+    };
 
   private void start() {
     // We rely on the nodes being ordered leaves first.
@@ -287,7 +286,7 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
     if (rel instanceof TableScan) {
       sink = new EnumerableProxySink();
     } else {
-      final ArrayDeque<Row> queue = new ArrayDeque<Row>(1);
+      final ArrayDeque<Row> queue = new ArrayDeque<>(1);
       sink = new ListSink(queue);
     }
     NodeInfo nodeInfo = new NodeInfo(rel, sink);
@@ -319,36 +318,31 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
    * A sink that just proxies for an {@link org.apache.calcite.linq4j.Enumerable}. As such, its
    * not really a "sink" but instead just a thin layer for the {@link EnumerableProxySource} to
    * get an enumerator.
-   * <p>
-   * It can be little bit slower than the {@link Interpreter.ListSink} when trying to iterate
+   *
+   * <p>It can be little bit slower than the {@link Interpreter.ListSink} when trying to iterate
    * over the elements of the enumerable, unless the enumerable is backed by an in-memory cache
    * of the rows.
-   * </p>
    */
   private static class EnumerableProxySink implements Sink {
-
     private Enumerable<Row> enumerable;
 
-    @Override
-    public void send(Row row) throws InterruptedException {
-      throw new UnsupportedOperationException("Row are only added through the enumerable passed "
-                                              + "in through #setSourceEnumerable()!");
+    @Override public void send(Row row) throws InterruptedException {
+      throw new UnsupportedOperationException("Rows are only added through the "
+          + "enumerable passed in through #setSourceEnumerable()!");
     }
 
-    @Override
-    public void end() throws InterruptedException {
+    @Override public void end() throws InterruptedException {
       // noop
     }
 
-    @Override
-    public void setSourceEnumerable(Enumerable<Row> enumerable) {
+    @Override public void setSourceEnumerable(Enumerable<Row> enumerable) {
       this.enumerable = enumerable;
     }
   }
 
   /**
    * A {@link Source} that is just backed by an {@link Enumerator}. The {@link Enumerator} is closed
-   * when it is finished or by calling {@link #close()}
+   * when it is finished or by calling {@link #close()}.
    */
   private static class EnumerableProxySource implements Source {
 
@@ -359,12 +353,11 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
       this.source = sink;
     }
 
-    @Override
-    public Row receive() {
+    @Override public Row receive() {
       if (enumerator == null) {
         enumerator = source.enumerable.enumerator();
-        assert enumerator != null : "Sink did not set enumerable before source was asked for "
-                                    + "a row!";
+        assert enumerator != null
+            : "Sink did not set enumerable before source was asked for a row!";
       }
       if (enumerator.moveNext()) {
         return enumerator.current();
@@ -375,8 +368,7 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
       return null;
     }
 
-    @Override
-    public void close() {
+    @Override public void close() {
       if (this.enumerator != null) {
         this.enumerator.close();
       }
@@ -398,12 +390,11 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
     public void end() throws InterruptedException {
     }
 
-    @Override
-    public void setSourceEnumerable(Enumerable<Row> enumerable) throws InterruptedException {
+    @Override public void setSourceEnumerable(Enumerable<Row> enumerable)
+        throws InterruptedException {
       // just copy over the source into the local list
-      Enumerator<Row> enumerator = enumerable.enumerator();
-      Row row;
-      while (!enumerator.moveNext()) {
+      final Enumerator<Row> enumerator = enumerable.enumerator();
+      while (enumerator.moveNext()) {
         this.send(enumerator.current());
       }
       enumerator.close();
@@ -426,8 +417,7 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
       }
     }
 
-    @Override
-    public void close() {
+    @Override public void close() {
       // noop
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/core/src/main/java/org/apache/calcite/interpreter/Sink.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Sink.java b/core/src/main/java/org/apache/calcite/interpreter/Sink.java
index 9e49ee5..6056b96 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Sink.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Sink.java
@@ -28,6 +28,8 @@ public interface Sink {
 
   void end() throws InterruptedException;
 
+  /** This method is temporary. It will be removed without notice. */
+  @Deprecated
   void setSourceEnumerable(Enumerable<Row> enumerable) throws InterruptedException;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index b8eacdd..5a6291b 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -71,15 +71,17 @@ public class StreamTest {
       + "         },\n"
       + "         factory: '" + clazz.getName() + "'\n"
       + "       } ]\n"
-      + "     }\n";
+      + "     }";
   }
 
   public static final String STREAM_MODEL = "{\n"
       + "  version: '1.0',\n"
       + "  defaultSchema: 'foodmart',\n"
       + "   schemas: [\n"
-      + schemaFor(STREAM_SCHEMA_NAME, OrdersStreamTableFactory.class) + ",\n"
+      + schemaFor(STREAM_SCHEMA_NAME, OrdersStreamTableFactory.class)
+      + ",\n"
       + schemaFor(INFINITE_STREAM_SCHEMA_NAME, InfiniteOrdersStreamTableFactory.class)
+      + "\n"
       + "   ]\n"
       + "}";
 
@@ -196,16 +198,18 @@ public class StreamTest {
   }
 
   /**
-   * Regression test for CALCITE-809
+   * Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-809">[CALCITE-809]
+   * TableScan does not support large/infinite scans</a>.
    */
   @Test public void testInfiniteStreamsDoNotBufferInMemory() {
     CalciteAssert.model(STREAM_MODEL)
-                 .withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME)
-                 .query("select stream * from orders")
-                 .limit(100)
-                 .explainContains("EnumerableInterpreter\n"
-                                  + "  BindableTableScan(table=[[]])")
-                 .returnsCount(100);
+        .withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME)
+        .query("select stream * from orders")
+        .limit(100)
+        .explainContains("EnumerableInterpreter\n"
+            + "  BindableTableScan(table=[[]])")
+        .returnsCount(100);
   }
 
   private Function<ResultSet, Void> startsWith(String... rows) {
@@ -240,15 +244,14 @@ public class StreamTest {
     protected final RelProtoDataType protoRowType = new RelProtoDataType() {
       public RelDataType apply(RelDataTypeFactory a0) {
         return a0.builder()
-                 .add("ROWTIME", SqlTypeName.TIMESTAMP)
-                 .add("ID", SqlTypeName.INTEGER)
-                 .add("PRODUCT", SqlTypeName.VARCHAR, 10)
-                 .add("UNITS", SqlTypeName.INTEGER)
-                 .build();
+            .add("ROWTIME", SqlTypeName.TIMESTAMP)
+            .add("ID", SqlTypeName.INTEGER)
+            .add("PRODUCT", SqlTypeName.VARCHAR, 10)
+            .add("UNITS", SqlTypeName.INTEGER)
+            .build();
       }
     };
 
-
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
       return protoRowType.apply(typeFactory);
     }
@@ -300,8 +303,7 @@ public class StreamTest {
       return Linq4j.asEnumerable(rows);
     }
 
-    @Override
-    public Table stream() {
+    @Override public Table stream() {
       return new OrdersTable(rows);
     }
   }
@@ -316,48 +318,47 @@ public class StreamTest {
     }
 
     public Table create(SchemaPlus schema, String name,
-      Map<String, Object> operand, RelDataType rowType) {
+        Map<String, Object> operand, RelDataType rowType) {
       return new InfiniteOrdersTable();
     }
   }
 
-  public static final Function0<Object[]> ROW_GENERATOR = new Function0() {
-    private int counter = 0;
-    private Iterator<String> items = Iterables.cycle("paint", "paper", "brush").iterator();
-
-    @Override
-    public Object[] apply() {
-      return new Object[]{System.currentTimeMillis(), counter++, items.next(), 10};
-    }
-  };
+  public static final Function0<Object[]> ROW_GENERATOR =
+      new Function0<Object[]>() {
+        private int counter = 0;
+        private Iterator<String> items =
+            Iterables.cycle("paint", "paper", "brush").iterator();
 
+        @Override public Object[] apply() {
+          return new Object[]{System.currentTimeMillis(), counter++, items.next(), 10};
+        }
+      };
 
   /**
    * Table representing an infinitely larger ORDERS stream.
    */
   public static class InfiniteOrdersTable extends BaseOrderStreamTable {
-
     public Enumerable<Object[]> scan(DataContext root) {
       return Linq4j.asEnumerable(new Iterable<Object[]>() {
-        @Override
-        public Iterator<Object[]> iterator() {
+        @Override public Iterator<Object[]> iterator() {
           return new Iterator<Object[]>() {
-            @Override
             public boolean hasNext() {
               return true;
             }
 
-            @Override
             public Object[] next() {
               return ROW_GENERATOR.apply();
             }
+
+            public void remove() {
+              throw new UnsupportedOperationException();
+            }
           };
         }
       });
     }
 
-    @Override
-    public Table stream() {
+    @Override public Table stream() {
       return this;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java b/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
index 043f094..c4d003b 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/DelegatingEnumerator.java
@@ -17,12 +17,12 @@
 package org.apache.calcite.linq4j;
 
 /**
- * Simple enumerator that just delegates all calls to the passed enumerator
+ * Simple enumerator that just delegates all calls to the passed enumerator.
+ *
  * @param <T> type of value to return, as passed from the delegate enumerator
  */
 public class DelegatingEnumerator<T> implements Enumerator<T> {
-
-  private Enumerator<T> delegate;
+  protected final Enumerator<T> delegate;
 
   public DelegatingEnumerator(Enumerator<T> delegate) {
     this.delegate = delegate;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a3da6915/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
index eb50cee..85f8bc6 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
@@ -206,13 +206,8 @@ public abstract class Linq4j {
   }
 
   public static <T, R> Enumerator<R> transform(final Enumerator<T> enumerator,
-    final Function1<T, R> func) {
-    return new DelegatingEnumerator<R>((Enumerator<R>) enumerator) {
-      @Override
-      public R current() {
-        return func.apply((T) super.current());
-      }
-    };
+      final Function1<T, R> func) {
+    return new TransformedEnumerator<>(enumerator, func);
   }
 
   /**
@@ -732,6 +727,39 @@ public abstract class Linq4j {
     public void close() {
     }
   }
+
+  /** Enumerator that applies a transform to each value from a backing
+   * enumerator.
+   *
+   * @param <T> Element type of backing enumerator
+   * @param <R> Element type
+   */
+  private static class TransformedEnumerator<T, R> implements Enumerator<R> {
+    private final Enumerator<T> enumerator;
+    private final Function1<T, R> func;
+
+    public TransformedEnumerator(Enumerator<T> enumerator,
+        Function1<T, R> func) {
+      this.enumerator = enumerator;
+      this.func = func;
+    }
+
+    public boolean moveNext() {
+      return enumerator.moveNext();
+    }
+
+    public R current() {
+      return func.apply(enumerator.current());
+    }
+
+    public void reset() {
+      enumerator.reset();
+    }
+
+    public void close() {
+      enumerator.close();
+    }
+  }
 }
 
 // End Linq4j.java