You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/01/20 00:23:56 UTC

incubator-calcite git commit: [CALCITE-543] Implement Aggregate (including grouping sets) in Interpreter (Jacques Nadeau)

Repository: incubator-calcite
Updated Branches:
  refs/heads/master 986f99434 -> 2bbfbdab8


[CALCITE-543] Implement Aggregate (including grouping sets) in Interpreter (Jacques Nadeau)

Minor fix-up by Julian Hyde.


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/2bbfbdab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/2bbfbdab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/2bbfbdab

Branch: refs/heads/master
Commit: 2bbfbdab8b59cf26ace3431a6e00f37bc27ad3e0
Parents: 986f994
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Jan 18 16:22:07 2015 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Mon Jan 19 15:01:35 2015 -0800

----------------------------------------------------------------------
 .../calcite/interpreter/AbstractSingleNode.java |  38 ++++
 .../calcite/interpreter/AggregateNode.java      | 193 +++++++++++++++++++
 .../org/apache/calcite/interpreter/Nodes.java   |   5 +
 .../org/apache/calcite/interpreter/Row.java     |  50 +++++
 .../apache/calcite/test/InterpreterTest.java    |  49 ++++-
 5 files changed, 334 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
new file mode 100644
index 0000000..ffbc46b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.rel.SingleRel;
+
+/**
+ * An interpreter that takes expects one incoming source relational expression.
+ *
+ * @param <T> Type of relational expression
+ */
+abstract class AbstractSingleNode<T extends SingleRel> implements Node {
+  protected final Source source;
+  protected final Sink sink;
+  protected final T rel;
+
+  public AbstractSingleNode(Interpreter interpreter, T rel) {
+    this.rel = rel;
+    this.source = interpreter.source(rel, 0);
+    this.sink = interpreter.sink(rel);
+  }
+}
+
+// End AbstractSingleNode.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
new file mode 100644
index 0000000..145b44e
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.interpreter.Row.RowBuilder;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interpreter node that implements an
+ * {@link org.apache.calcite.rel.core.Aggregate}.
+ */
+public class AggregateNode extends AbstractSingleNode<Aggregate> {
+  private final List<Grouping> groups = Lists.newArrayList();
+  private final ImmutableBitSet unionGroups;
+  private final int outputRowLength;
+
+  public AggregateNode(Interpreter interpreter, Aggregate rel) {
+    super(interpreter, rel);
+
+    ImmutableBitSet union = ImmutableBitSet.of();
+
+    if (rel.getGroupSets() != null) {
+      for (ImmutableBitSet group : rel.getGroupSets()) {
+        union = union.union(group);
+        groups.add(new Grouping(group));
+      }
+    }
+
+    this.unionGroups = union;
+    this.outputRowLength = unionGroups.cardinality()
+        + (rel.indicator ? unionGroups.cardinality() : 0)
+        + rel.getAggCallList().size();
+  }
+
+  public void run() throws InterruptedException {
+
+    Row r;
+    while ((r = source.receive()) != null) {
+      for (Grouping group : groups) {
+        group.send(r);
+      }
+    }
+
+    for (Grouping group : groups) {
+      group.end(sink);
+    }
+  }
+
+  private AccumulatorList getNewAccumList() {
+    AccumulatorList list = new AccumulatorList();
+    for (AggregateCall call : rel.getAggCallList()) {
+      list.add(getAccumulator(call));
+    }
+    return list;
+  }
+
+  private static Accumulator getAccumulator(final AggregateCall call) {
+    String agg = call.getAggregation().getName();
+
+    if (agg.equals("COUNT")) {
+      return new Accumulator() {
+        long cnt = 0;
+
+        public void send(Row row) {
+          boolean notNull = true;
+          for (Integer i : call.getArgList()) {
+            if (row.getObject(i) == null) {
+              notNull = false;
+              break;
+            }
+          }
+          if (notNull) {
+            cnt++;
+          }
+        }
+
+        public Object end() {
+          return cnt;
+        }
+
+      };
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("Aggregate doesn't currently support "
+              + "the %s aggregate function.", agg));
+    }
+
+  }
+
+  /**
+   * Internal class to track groupings
+   */
+  private class Grouping {
+    private ImmutableBitSet grouping;
+    private Map<Row, AccumulatorList> accum = Maps.newHashMap();
+
+    private Grouping(ImmutableBitSet grouping) {
+      this.grouping = grouping;
+    }
+
+    public void send(Row row) {
+      // TODO: fix the size of this row.
+      RowBuilder builder = Row.newBuilder(grouping.cardinality());
+      for (Integer i : grouping) {
+        builder.set(i, row.getObject(i));
+      }
+      Row key = builder.build();
+
+      if (!accum.containsKey(key)) {
+        accum.put(key, getNewAccumList());
+      }
+
+      accum.get(key).send(row);
+    }
+
+    public void end(Sink sink) throws InterruptedException {
+      for (Map.Entry<Row, AccumulatorList> e : accum.entrySet()) {
+        final Row key = e.getKey();
+        final AccumulatorList list = e.getValue();
+
+        RowBuilder rb = Row.newBuilder(outputRowLength);
+        int index = 0;
+        for (Integer groupPos : unionGroups) {
+          if (grouping.get(groupPos)) {
+            rb.set(index, key.getObject(groupPos));
+            if (rel.indicator) {
+              rb.set(unionGroups.cardinality() + index, true);
+            }
+          }
+          // need to set false when not part of grouping set.
+
+          index++;
+        }
+
+        list.end(rb);
+
+        sink.send(rb.build());
+      }
+    }
+  }
+
+  /**
+   * A list of accumulators used during grouping.
+   */
+  private class AccumulatorList extends ArrayList<Accumulator> {
+    public void send(Row row) {
+      for (Accumulator a : this) {
+        a.send(row);
+      }
+    }
+
+    public void end(RowBuilder r) {
+      for (int accIndex = 0, rowIndex = r.size() - size();
+          rowIndex < r.size(); rowIndex++, accIndex++) {
+        r.set(rowIndex, get(accIndex).end());
+      }
+    }
+  }
+
+  /**
+   * Defines function implementation for
+   * things like {@code count()} and {@code sum()}.
+   */
+  private interface Accumulator {
+    void send(Row row);
+    Object end();
+  }
+}
+
+// End AggregateNode.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
index 4f4cfd6..b09e7c3 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Calc;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
@@ -116,6 +117,10 @@ public class Nodes {
       }
     }
 
+    public void visit(Aggregate agg) {
+      node = new AggregateNode(interpreter, agg);
+    }
+
     public void visit(Filter filter) {
       node = new FilterNode(interpreter, filter);
     }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/main/java/org/apache/calcite/interpreter/Row.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Row.java b/core/src/main/java/org/apache/calcite/interpreter/Row.java
index acead37..3f51c87 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Row.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Row.java
@@ -55,6 +55,11 @@ public class Row {
     return new Row(new Object[] {value0, value1, value2});
   }
 
+  /** Creates a Row with variable number of values. */
+  public static Row of(Object...values) {
+    return new Row(values);
+  }
+
   @Override public int hashCode() {
     return Arrays.hashCode(values);
   }
@@ -77,6 +82,51 @@ public class Row {
   Object[] getValues() {
     return values;
   }
+
+  public int size() {
+    return values.length;
+  }
+
+  /**
+   * Create a RowBuilder object that eases creation of a new row.
+   *
+   * @param size Number of columns in output data.
+   * @return New RowBuilder object.
+   */
+  public static RowBuilder newBuilder(int size) {
+    return new RowBuilder(size);
+  }
+
+  /**
+   * Utility class to build row objects.
+   */
+  public static class RowBuilder {
+    Object[] values;
+
+    private RowBuilder(int size) {
+      values = new Object[size];
+    }
+
+    /**
+     * Set the value of a particular column.
+     * @param index Zero-indexed position of value.
+     * @param value Desired column value.
+     */
+    public void set(int index, Object value) {
+      values[index] = value;
+    }
+
+    /** Return a Row object **/
+    public Row build() {
+      return new Row(values);
+    }
+
+    public int size() {
+      return values.length;
+    }
+  }
+
+
 }
 
 // End Row.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
index 6a89061..77a9311 100644
--- a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
@@ -35,6 +35,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -105,11 +106,26 @@ public class InterpreterTest {
   }
 
   private static void assertRows(Interpreter interpreter, String... rows) {
+    assertRows(interpreter, false, rows);
+  }
+
+  private static void assertRowsUnordered(Interpreter interpreter,
+      String... rows) {
+    assertRows(interpreter, true, rows);
+  }
+
+  private static void assertRows(Interpreter interpreter,
+      boolean unordered, String... rows) {
     final List<String> list = Lists.newArrayList();
     for (Object[] row : interpreter) {
       list.add(Arrays.toString(row));
     }
-    assertThat(list, equalTo(Arrays.asList(rows)));
+    final List<String> expected = Arrays.asList(rows);
+    if (unordered) {
+      Collections.sort(list);
+      Collections.sort(expected);
+    }
+    assertThat(list, equalTo(expected));
   }
 
   /** Tests executing a simple plan using an interpreter. */
@@ -148,6 +164,37 @@ public class InterpreterTest {
         "[6, George]");
   }
 
+  @Test public void testAggregate() throws Exception {
+    rootSchema.add("beatles", new ScannableTableTest.BeatlesTable());
+    SqlNode parse =
+        planner.parse("select  count(*) from \"beatles\"");
+
+    SqlNode validate = planner.validate(parse);
+    RelNode convert = planner.convert(validate);
+
+    final Interpreter interpreter =
+        new Interpreter(new MyDataContext(planner), convert);
+    assertRows(interpreter,
+        "[4]");
+  }
+
+  @Test public void testAggregateGroup() throws Exception {
+    rootSchema.add("beatles", new ScannableTableTest.BeatlesTable());
+    SqlNode parse =
+        planner.parse("select \"j\", count(*) from \"beatles\" group by \"j\"");
+
+    SqlNode validate = planner.validate(parse);
+    RelNode convert = planner.convert(validate);
+
+    final Interpreter interpreter =
+        new Interpreter(new MyDataContext(planner), convert);
+    assertRowsUnordered(interpreter,
+        "[George, 1]",
+        "[Paul, 1]",
+        "[John, 1]",
+        "[Ringo, 1]");
+  }
+
   /** Tests executing a plan on a single-column
    * {@link org.apache.calcite.schema.ScannableTable} using an interpreter. */
   @Test public void testInterpretSimpleScannableTable() throws Exception {