You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/02 02:09:50 UTC

[14/18] incubator-calcite git commit: Further fix up [CALCITE-809], eliminating the proxy sink

Further fix up [CALCITE-809], eliminating the proxy sink


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/5e7d457a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/5e7d457a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/5e7d457a

Branch: refs/heads/master
Commit: 5e7d457ae82cc7a0e3bf055fa2e1f45b19186dcf
Parents: a3da691
Author: Julian Hyde <jh...@apache.org>
Authored: Mon Aug 10 17:24:09 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Sep 1 16:17:16 2015 -0700

----------------------------------------------------------------------
 .../apache/calcite/interpreter/Interpreter.java | 130 ++++++++-----------
 .../calcite/interpreter/TableScanNode.java      |  10 +-
 .../java/org/apache/calcite/linq4j/Linq4j.java  |  85 +++++-------
 .../calcite/linq4j/TransformedEnumerator.java   |  51 ++++++++
 .../apache/calcite/linq4j/test/Linq4jTest.java  |  22 ++++
 5 files changed, 161 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
index e4c2cfe..e14bffe 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java
@@ -18,18 +18,16 @@ package org.apache.calcite.interpreter;
 
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.AbstractEnumerable;
-import org.apache.calcite.linq4j.DelegatingEnumerator;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.linq4j.Linq4j;
-import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.TransformedEnumerator;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.rules.CalcSplitRule;
 import org.apache.calcite.rel.rules.FilterTableScanRule;
 import org.apache.calcite.rel.rules.ProjectTableScanRule;
@@ -92,29 +90,21 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
 
   public Enumerator<Object[]> enumerator() {
     start();
-    Sink sink = nodes.get(rootRel).sink;
-    Enumerator<Row> rows;
-    if (sink instanceof EnumerableProxySink) {
-      rows = ((EnumerableProxySink) sink).enumerable.enumerator();
+    final NodeInfo nodeInfo = nodes.get(rootRel);
+    final Enumerator<Row> rows;
+    if (nodeInfo.rowEnumerable != null) {
+      rows = nodeInfo.rowEnumerable.enumerator();
     } else {
-      final ArrayDeque<Row> queue = ((ListSink) sink).list;
-      rows = Linq4j.asEnumerable(queue).enumerator();
+      final ArrayDeque<Row> queue = ((ListSink) nodeInfo.sink).list;
+      rows = Linq4j.iterableEnumerator(queue);
     }
 
-    return new DelegatingEnumerator<Object[]>(Linq4j.transform(rows, rowConverter)) {
-      @Override public void close() {
-        super.close();
-        Interpreter.this.close();
-      }
-    };
-  }
-
-  private Function1<Row, Object[]> rowConverter =
-    new Function1<Row, Object[]>() {
-      @Override public Object[] apply(Row row) {
+    return new TransformedEnumerator<Row, Object[]>(rows) {
+      protected Object[] transform(Row row) {
         return row.getValues();
       }
     };
+  }
 
   private void start() {
     // We rely on the nodes being ordered leaves first.
@@ -259,15 +249,16 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
 
   public Source source(RelNode rel, int ordinal) {
     final RelNode input = getInput(rel, ordinal);
-    final NodeInfo x = nodes.get(input);
-    if (x == null) {
+    final NodeInfo nodeInfo = nodes.get(input);
+    if (nodeInfo == null) {
       throw new AssertionError("should be registered: " + rel);
     }
-    Sink sink = x.sink;
+    if (nodeInfo.rowEnumerable != null) {
+      return new EnumeratorSource(nodeInfo.rowEnumerable.enumerator());
+    }
+    Sink sink = nodeInfo.sink;
     if (sink instanceof ListSink) {
-      return new ListSource((ListSink) x.sink);
-    } else if (sink instanceof EnumerableProxySink) {
-      return new EnumerableProxySource((EnumerableProxySink) sink);
+      return new ListSource((ListSink) nodeInfo.sink);
     }
     throw new IllegalStateException(
       "Got a sink " + sink + " to which there is no match source type!");
@@ -281,19 +272,39 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
     return rel.getInput(ordinal);
   }
 
+  /**
+   * Creates a Sink for a relational expression to write into.
+   *
+   * <p>This method is generally called from the constructor of a {@link Node}.
+   * But a constructor could instead call
+   * {@link #enumerable(RelNode, Enumerable)}.
+   *
+   * @param rel Relational expression
+   * @return Sink
+   */
   public Sink sink(RelNode rel) {
-    final Sink sink;
-    if (rel instanceof TableScan) {
-      sink = new EnumerableProxySink();
-    } else {
-      final ArrayDeque<Row> queue = new ArrayDeque<>(1);
-      sink = new ListSink(queue);
-    }
-    NodeInfo nodeInfo = new NodeInfo(rel, sink);
+    final ArrayDeque<Row> queue = new ArrayDeque<>(1);
+    final Sink sink = new ListSink(queue);
+    NodeInfo nodeInfo = new NodeInfo(rel, sink, null);
     nodes.put(rel, nodeInfo);
     return sink;
   }
 
+  /** Tells the interpreter that a given relational expression wishes to
+   * give its output as an enumerable.
+   *
+   * <p>This is as opposed to the norm, where a relational expression calls
+   * {@link #sink(RelNode)}, then its {@link Node#run()} method writes into that
+   * sink.
+   *
+   * @param rel Relational expression
+   * @param rowEnumerable Contents of relational expression
+   */
+  public void enumerable(RelNode rel, Enumerable<Row> rowEnumerable) {
+    NodeInfo nodeInfo = new NodeInfo(rel, null, rowEnumerable);
+    nodes.put(rel, nodeInfo);
+  }
+
   public Context createContext() {
     return new Context(dataContext);
   }
@@ -306,37 +317,14 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
   private static class NodeInfo {
     final RelNode rel;
     final Sink sink;
+    final Enumerable<Row> rowEnumerable;
     Node node;
 
-    public NodeInfo(RelNode rel, Sink sink) {
+    public NodeInfo(RelNode rel, Sink sink, Enumerable<Row> rowEnumerable) {
       this.rel = rel;
       this.sink = sink;
-    }
-  }
-
-  /**
-   * A sink that just proxies for an {@link org.apache.calcite.linq4j.Enumerable}. As such, its
-   * not really a "sink" but instead just a thin layer for the {@link EnumerableProxySource} to
-   * get an enumerator.
-   *
-   * <p>It can be little bit slower than the {@link Interpreter.ListSink} when trying to iterate
-   * over the elements of the enumerable, unless the enumerable is backed by an in-memory cache
-   * of the rows.
-   */
-  private static class EnumerableProxySink implements Sink {
-    private Enumerable<Row> enumerable;
-
-    @Override public void send(Row row) throws InterruptedException {
-      throw new UnsupportedOperationException("Rows are only added through the "
-          + "enumerable passed in through #setSourceEnumerable()!");
-    }
-
-    @Override public void end() throws InterruptedException {
-      // noop
-    }
-
-    @Override public void setSourceEnumerable(Enumerable<Row> enumerable) {
-      this.enumerable = enumerable;
+      this.rowEnumerable = rowEnumerable;
+      assert (sink != null) != (rowEnumerable != null) : "one or the other";
     }
   }
 
@@ -344,34 +332,24 @@ public class Interpreter extends AbstractEnumerable<Object[]> {
    * A {@link Source} that is just backed by an {@link Enumerator}. The {@link Enumerator} is closed
    * when it is finished or by calling {@link #close()}.
    */
-  private static class EnumerableProxySource implements Source {
+  private static class EnumeratorSource implements Source {
+    private final Enumerator<Row> enumerator;
 
-    private Enumerator<Row> enumerator;
-    private final EnumerableProxySink source;
-
-    public EnumerableProxySource(EnumerableProxySink sink) {
-      this.source = sink;
+    public EnumeratorSource(final Enumerator<Row> enumerator) {
+      this.enumerator = Preconditions.checkNotNull(enumerator);
     }
 
     @Override public Row receive() {
-      if (enumerator == null) {
-        enumerator = source.enumerable.enumerator();
-        assert enumerator != null
-            : "Sink did not set enumerable before source was asked for a row!";
-      }
       if (enumerator.moveNext()) {
         return enumerator.current();
       }
       // close the enumerator once we have gone through everything
       enumerator.close();
-      this.enumerator = null;
       return null;
     }
 
     @Override public void close() {
-      if (this.enumerator != null) {
-        this.enumerator.close();
-      }
+      enumerator.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
index 2824b37..c88627b 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java
@@ -59,19 +59,13 @@ import static org.apache.calcite.util.Static.RESOURCE;
  * {@link org.apache.calcite.rel.core.TableScan}.
  */
 public class TableScanNode implements Node {
-  private final Sink sink;
-  private final Enumerable<Row> enumerable;
-
   private TableScanNode(Interpreter interpreter, TableScan rel,
       Enumerable<Row> enumerable) {
-    this.enumerable = enumerable;
-    this.sink = interpreter.sink(rel);
+    interpreter.enumerable(rel, enumerable);
   }
 
-
   public void run() throws InterruptedException {
-    sink.setSourceEnumerable(enumerable);
-    sink.end();
+    // nothing to do
   }
 
   /** Creates a TableScanNode.

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
index 85f8bc6..da46850 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
@@ -102,7 +102,7 @@ public abstract class Linq4j {
    * @return Iterator
    */
   public static <T> Iterator<T> enumeratorIterator(Enumerator<T> enumerator) {
-    return new EnumeratorIterator<T>(enumerator);
+    return new EnumeratorIterator<>(enumerator);
   }
 
   /**
@@ -120,7 +120,7 @@ public abstract class Linq4j {
           (Enumerable) iterable;
       return enumerable.enumerator();
     }
-    return new IterableEnumerator<T>(iterable);
+    return new IterableEnumerator<>(iterable);
   }
 
   /**
@@ -132,7 +132,7 @@ public abstract class Linq4j {
    * @return enumerable
    */
   public static <T> Enumerable<T> asEnumerable(final List<T> list) {
-    return new ListEnumerable<T>(list);
+    return new ListEnumerable<>(list);
   }
 
   /**
@@ -151,7 +151,7 @@ public abstract class Linq4j {
       //noinspection unchecked
       return asEnumerable((List) collection);
     }
-    return new CollectionEnumerable<T>(collection);
+    return new CollectionEnumerable<>(collection);
   }
 
   /**
@@ -170,7 +170,7 @@ public abstract class Linq4j {
       //noinspection unchecked
       return asEnumerable((Collection) iterable);
     }
-    return new IterableEnumerable<T>(iterable);
+    return new IterableEnumerable<>(iterable);
   }
 
   /**
@@ -182,7 +182,7 @@ public abstract class Linq4j {
    * @return enumerable
    */
   public static <T> Enumerable<T> asEnumerable(final T[] ts) {
-    return new ListEnumerable<T>(Arrays.asList(ts));
+    return new ListEnumerable<>(Arrays.asList(ts));
   }
 
   /**
@@ -202,12 +202,24 @@ public abstract class Linq4j {
   }
 
   private static <V> Enumerator<V> listEnumerator(List<? extends V> list) {
-    return new ListEnumerator<V>(list);
+    return new ListEnumerator<>(list);
   }
 
-  public static <T, R> Enumerator<R> transform(final Enumerator<T> enumerator,
-      final Function1<T, R> func) {
-    return new TransformedEnumerator<>(enumerator, func);
+  /** Applies a function to each element of an Enumerator.
+   *
+   * @param enumerator Backing enumerator
+   * @param func Transform function
+   * @param <F> Backing element type
+   * @param <E> Element type
+   * @return Enumerator
+   */
+  public static <F, E> Enumerator<E> transform(Enumerator<F> enumerator,
+      final Function1<F, E> func) {
+    return new TransformedEnumerator<F, E>(enumerator) {
+      protected E transform(F from) {
+        return func.apply(from);
+      }
+    };
   }
 
   /**
@@ -225,7 +237,7 @@ public abstract class Linq4j {
    * query operators to be invoked on collections
    * (including {@link java.util.List} and {@link java.util.Set}) by supplying
    * the necessary type information. For example, {@link ArrayList} does not
-   * implement {@link Enumerable}&lt;T&gt;, but you can invoke
+   * implement {@link Enumerable}&lt;F&gt;, but you can invoke
    *
    * <blockquote><code>Linq4j.cast(list, Integer.class)</code></blockquote>
    *
@@ -266,7 +278,7 @@ public abstract class Linq4j {
    * query operators to be invoked on collections
    * (including {@link java.util.List} and {@link java.util.Set}) by supplying
    * the necessary type information. For example, {@link ArrayList} does not
-   * implement {@link Enumerable}&lt;T&gt;, but you can invoke
+   * implement {@link Enumerable}&lt;F&gt;, but you can invoke
    *
    * <blockquote><code>Linq4j.ofType(list, Integer.class)</code></blockquote>
    *
@@ -304,7 +316,7 @@ public abstract class Linq4j {
    * @return Singleton enumerator
    */
   public static <T> Enumerator<T> singletonEnumerator(T element) {
-    return new SingletonEnumerator<T>(element);
+    return new SingletonEnumerator<>(element);
   }
 
   /**
@@ -315,7 +327,7 @@ public abstract class Linq4j {
    * @return Singleton enumerator
    */
   public static <T> Enumerator<T> singletonNullEnumerator() {
-    return new SingletonNullEnumerator<T>();
+    return new SingletonNullEnumerator<>();
   }
 
   /**
@@ -353,7 +365,7 @@ public abstract class Linq4j {
    */
   public static <E> Enumerable<E> concat(
       final List<Enumerable<E>> enumerableList) {
-    return new CompositeEnumerable<E>(enumerableList);
+    return new CompositeEnumerable<>(enumerableList);
   }
 
   /**
@@ -379,7 +391,7 @@ public abstract class Linq4j {
    */
   public static <T> Enumerator<List<T>> product(
       List<Enumerator<T>> enumerators) {
-    return new CartesianProductEnumerator<T>(enumerators);
+    return new CartesianProductEnumerator<>(enumerators);
   }
 
   /** Returns the cartesian product of an iterable of iterables. */
@@ -392,7 +404,7 @@ public abstract class Linq4j {
           enumerators.add(iterableEnumerator(iterable));
         }
         return enumeratorIterator(
-            new CartesianProductEnumerator<T>(enumerators));
+            new CartesianProductEnumerator<>(enumerators));
       }
     };
   }
@@ -599,7 +611,7 @@ public abstract class Linq4j {
     @Override public Enumerator<T> enumerator() {
       if (iterable instanceof RandomAccess) {
         //noinspection unchecked
-        return new ListEnumerator<T>((List) iterable);
+        return new ListEnumerator<>((List) iterable);
       }
       return super.enumerator();
     }
@@ -613,7 +625,7 @@ public abstract class Linq4j {
       if (count >= list.size()) {
         return Linq4j.emptyEnumerable();
       }
-      return new ListEnumerable<T>(list.subList(count, list.size()));
+      return new ListEnumerable<>(list.subList(count, list.size()));
     }
 
     @Override public Enumerable<T> take(int count) {
@@ -621,7 +633,7 @@ public abstract class Linq4j {
       if (count >= list.size()) {
         return this;
       }
-      return new ListEnumerable<T>(list.subList(0, count));
+      return new ListEnumerable<>(list.subList(0, count));
     }
 
     @Override public T elementAt(int index) {
@@ -727,39 +739,6 @@ public abstract class Linq4j {
     public void close() {
     }
   }
-
-  /** Enumerator that applies a transform to each value from a backing
-   * enumerator.
-   *
-   * @param <T> Element type of backing enumerator
-   * @param <R> Element type
-   */
-  private static class TransformedEnumerator<T, R> implements Enumerator<R> {
-    private final Enumerator<T> enumerator;
-    private final Function1<T, R> func;
-
-    public TransformedEnumerator(Enumerator<T> enumerator,
-        Function1<T, R> func) {
-      this.enumerator = enumerator;
-      this.func = func;
-    }
-
-    public boolean moveNext() {
-      return enumerator.moveNext();
-    }
-
-    public R current() {
-      return func.apply(enumerator.current());
-    }
-
-    public void reset() {
-      enumerator.reset();
-    }
-
-    public void close() {
-      enumerator.close();
-    }
-  }
 }
 
 // End Linq4j.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/linq4j/src/main/java/org/apache/calcite/linq4j/TransformedEnumerator.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/TransformedEnumerator.java b/linq4j/src/main/java/org/apache/calcite/linq4j/TransformedEnumerator.java
new file mode 100644
index 0000000..839c338
--- /dev/null
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/TransformedEnumerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j;
+
+/** Enumerator that applies a transform to each value from a backing
+ * enumerator.
+ *
+ * @param <F> Element type of backing enumerator
+ * @param <E> Element type
+ */
+public abstract class TransformedEnumerator<F, E> implements Enumerator<E> {
+  protected final Enumerator<F> enumerator;
+
+  public TransformedEnumerator(Enumerator<F> enumerator) {
+    this.enumerator = enumerator;
+  }
+
+  protected abstract E transform(F from);
+
+  public boolean moveNext() {
+    return enumerator.moveNext();
+  }
+
+  public E current() {
+    return transform(enumerator.current());
+  }
+
+  public void reset() {
+    enumerator.reset();
+  }
+
+  public void close() {
+    enumerator.close();
+  }
+}
+
+// End TransformedEnumerator.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/5e7d457a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java
----------------------------------------------------------------------
diff --git a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java
index 338251b..dfecf68 100644
--- a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java
+++ b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jTest.java
@@ -855,6 +855,28 @@ public class Linq4jTest {
     assertThat(enumerator.moveNext(), is(false));
   }
 
+  @Test public void testTransformEnumerator() {
+    final List<String> strings = Arrays.asList("one", "two", "three");
+    final Function1<String, Integer> func = new Function1<String, Integer>() {
+      public Integer apply(String a0) {
+        return a0.length();
+      }
+    };
+    final Enumerator<Integer> enumerator =
+        Linq4j.transform(Linq4j.enumerator(strings), func);
+    assertThat(enumerator.moveNext(), is(true));
+    assertThat(enumerator.current(), is(3));
+    assertThat(enumerator.moveNext(), is(true));
+    assertThat(enumerator.current(), is(3));
+    assertThat(enumerator.moveNext(), is(true));
+    assertThat(enumerator.current(), is(5));
+    assertThat(enumerator.moveNext(), is(false));
+
+    final Enumerator<Integer> enumerator2 =
+        Linq4j.transform(Linq4j.<String>emptyEnumerator(), func);
+    assertThat(enumerator2.moveNext(), is(false));
+  }
+
   @Test public void testCast() {
     final List<Number> numbers = Arrays.asList((Number) 2, null, 3.14, 5);
     final Enumerator<Integer> enumerator =