You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/01/20 00:23:56 UTC
incubator-calcite git commit: [CALCITE-543] Implement Aggregate
(including grouping sets) in Interpreter (Jacques Nadeau)
Repository: incubator-calcite
Updated Branches:
refs/heads/master 986f99434 -> 2bbfbdab8
[CALCITE-543] Implement Aggregate (including grouping sets) in Interpreter (Jacques Nadeau)
Minor fix-up by Julian Hyde.
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/2bbfbdab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/2bbfbdab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/2bbfbdab
Branch: refs/heads/master
Commit: 2bbfbdab8b59cf26ace3431a6e00f37bc27ad3e0
Parents: 986f994
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Jan 18 16:22:07 2015 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Mon Jan 19 15:01:35 2015 -0800
----------------------------------------------------------------------
.../calcite/interpreter/AbstractSingleNode.java | 38 ++++
.../calcite/interpreter/AggregateNode.java | 193 +++++++++++++++++++
.../org/apache/calcite/interpreter/Nodes.java | 5 +
.../org/apache/calcite/interpreter/Row.java | 50 +++++
.../apache/calcite/test/InterpreterTest.java | 49 ++++-
5 files changed, 334 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
new file mode 100644
index 0000000..ffbc46b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.rel.SingleRel;
+
+/**
+ * An interpreter that takes expects one incoming source relational expression.
+ *
+ * @param <T> Type of relational expression
+ */
+abstract class AbstractSingleNode<T extends SingleRel> implements Node {
+ protected final Source source;
+ protected final Sink sink;
+ protected final T rel;
+
+ public AbstractSingleNode(Interpreter interpreter, T rel) {
+ this.rel = rel;
+ this.source = interpreter.source(rel, 0);
+ this.sink = interpreter.sink(rel);
+ }
+}
+
+// End AbstractSingleNode.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
new file mode 100644
index 0000000..145b44e
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.interpreter.Row.RowBuilder;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interpreter node that implements an
+ * {@link org.apache.calcite.rel.core.Aggregate}.
+ */
+public class AggregateNode extends AbstractSingleNode<Aggregate> {
+ private final List<Grouping> groups = Lists.newArrayList();
+ private final ImmutableBitSet unionGroups;
+ private final int outputRowLength;
+
+ public AggregateNode(Interpreter interpreter, Aggregate rel) {
+ super(interpreter, rel);
+
+ ImmutableBitSet union = ImmutableBitSet.of();
+
+ if (rel.getGroupSets() != null) {
+ for (ImmutableBitSet group : rel.getGroupSets()) {
+ union = union.union(group);
+ groups.add(new Grouping(group));
+ }
+ }
+
+ this.unionGroups = union;
+ this.outputRowLength = unionGroups.cardinality()
+ + (rel.indicator ? unionGroups.cardinality() : 0)
+ + rel.getAggCallList().size();
+ }
+
+ public void run() throws InterruptedException {
+
+ Row r;
+ while ((r = source.receive()) != null) {
+ for (Grouping group : groups) {
+ group.send(r);
+ }
+ }
+
+ for (Grouping group : groups) {
+ group.end(sink);
+ }
+ }
+
+ private AccumulatorList getNewAccumList() {
+ AccumulatorList list = new AccumulatorList();
+ for (AggregateCall call : rel.getAggCallList()) {
+ list.add(getAccumulator(call));
+ }
+ return list;
+ }
+
+ private static Accumulator getAccumulator(final AggregateCall call) {
+ String agg = call.getAggregation().getName();
+
+ if (agg.equals("COUNT")) {
+ return new Accumulator() {
+ long cnt = 0;
+
+ public void send(Row row) {
+ boolean notNull = true;
+ for (Integer i : call.getArgList()) {
+ if (row.getObject(i) == null) {
+ notNull = false;
+ break;
+ }
+ }
+ if (notNull) {
+ cnt++;
+ }
+ }
+
+ public Object end() {
+ return cnt;
+ }
+
+ };
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Aggregate doesn't currently support "
+ + "the %s aggregate function.", agg));
+ }
+
+ }
+
+ /**
+ * Internal class to track groupings
+ */
+ private class Grouping {
+ private ImmutableBitSet grouping;
+ private Map<Row, AccumulatorList> accum = Maps.newHashMap();
+
+ private Grouping(ImmutableBitSet grouping) {
+ this.grouping = grouping;
+ }
+
+ public void send(Row row) {
+ // TODO: fix the size of this row.
+ RowBuilder builder = Row.newBuilder(grouping.cardinality());
+ for (Integer i : grouping) {
+ builder.set(i, row.getObject(i));
+ }
+ Row key = builder.build();
+
+ if (!accum.containsKey(key)) {
+ accum.put(key, getNewAccumList());
+ }
+
+ accum.get(key).send(row);
+ }
+
+ public void end(Sink sink) throws InterruptedException {
+ for (Map.Entry<Row, AccumulatorList> e : accum.entrySet()) {
+ final Row key = e.getKey();
+ final AccumulatorList list = e.getValue();
+
+ RowBuilder rb = Row.newBuilder(outputRowLength);
+ int index = 0;
+ for (Integer groupPos : unionGroups) {
+ if (grouping.get(groupPos)) {
+ rb.set(index, key.getObject(groupPos));
+ if (rel.indicator) {
+ rb.set(unionGroups.cardinality() + index, true);
+ }
+ }
+ // need to set false when not part of grouping set.
+
+ index++;
+ }
+
+ list.end(rb);
+
+ sink.send(rb.build());
+ }
+ }
+ }
+
+ /**
+ * A list of accumulators used during grouping.
+ */
+ private class AccumulatorList extends ArrayList<Accumulator> {
+ public void send(Row row) {
+ for (Accumulator a : this) {
+ a.send(row);
+ }
+ }
+
+ public void end(RowBuilder r) {
+ for (int accIndex = 0, rowIndex = r.size() - size();
+ rowIndex < r.size(); rowIndex++, accIndex++) {
+ r.set(rowIndex, get(accIndex).end());
+ }
+ }
+ }
+
+ /**
+ * Defines function implementation for
+ * things like {@code count()} and {@code sum()}.
+ */
+ private interface Accumulator {
+ void send(Row row);
+ Object end();
+ }
+}
+
+// End AggregateNode.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
index 4f4cfd6..b09e7c3 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java
@@ -21,6 +21,7 @@ import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
@@ -116,6 +117,10 @@ public class Nodes {
}
}
+ public void visit(Aggregate agg) {
+ node = new AggregateNode(interpreter, agg);
+ }
+
public void visit(Filter filter) {
node = new FilterNode(interpreter, filter);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/main/java/org/apache/calcite/interpreter/Row.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Row.java b/core/src/main/java/org/apache/calcite/interpreter/Row.java
index acead37..3f51c87 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Row.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Row.java
@@ -55,6 +55,11 @@ public class Row {
return new Row(new Object[] {value0, value1, value2});
}
+ /** Creates a Row with variable number of values. */
+ public static Row of(Object...values) {
+ return new Row(values);
+ }
+
@Override public int hashCode() {
return Arrays.hashCode(values);
}
@@ -77,6 +82,51 @@ public class Row {
Object[] getValues() {
return values;
}
+
+ public int size() {
+ return values.length;
+ }
+
+ /**
+ * Create a RowBuilder object that eases creation of a new row.
+ *
+ * @param size Number of columns in output data.
+ * @return New RowBuilder object.
+ */
+ public static RowBuilder newBuilder(int size) {
+ return new RowBuilder(size);
+ }
+
+ /**
+ * Utility class to build row objects.
+ */
+ public static class RowBuilder {
+ Object[] values;
+
+ private RowBuilder(int size) {
+ values = new Object[size];
+ }
+
+ /**
+ * Set the value of a particular column.
+ * @param index Zero-indexed position of value.
+ * @param value Desired column value.
+ */
+ public void set(int index, Object value) {
+ values[index] = value;
+ }
+
+ /** Return a Row object **/
+ public Row build() {
+ return new Row(values);
+ }
+
+ public int size() {
+ return values.length;
+ }
+ }
+
+
}
// End Row.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/2bbfbdab/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
index 6a89061..77a9311 100644
--- a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
@@ -35,6 +35,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -105,11 +106,26 @@ public class InterpreterTest {
}
private static void assertRows(Interpreter interpreter, String... rows) {
+ assertRows(interpreter, false, rows);
+ }
+
+ private static void assertRowsUnordered(Interpreter interpreter,
+ String... rows) {
+ assertRows(interpreter, true, rows);
+ }
+
+ private static void assertRows(Interpreter interpreter,
+ boolean unordered, String... rows) {
final List<String> list = Lists.newArrayList();
for (Object[] row : interpreter) {
list.add(Arrays.toString(row));
}
- assertThat(list, equalTo(Arrays.asList(rows)));
+ final List<String> expected = Arrays.asList(rows);
+ if (unordered) {
+ Collections.sort(list);
+ Collections.sort(expected);
+ }
+ assertThat(list, equalTo(expected));
}
/** Tests executing a simple plan using an interpreter. */
@@ -148,6 +164,37 @@ public class InterpreterTest {
"[6, George]");
}
+ @Test public void testAggregate() throws Exception {
+ rootSchema.add("beatles", new ScannableTableTest.BeatlesTable());
+ SqlNode parse =
+ planner.parse("select count(*) from \"beatles\"");
+
+ SqlNode validate = planner.validate(parse);
+ RelNode convert = planner.convert(validate);
+
+ final Interpreter interpreter =
+ new Interpreter(new MyDataContext(planner), convert);
+ assertRows(interpreter,
+ "[4]");
+ }
+
+ @Test public void testAggregateGroup() throws Exception {
+ rootSchema.add("beatles", new ScannableTableTest.BeatlesTable());
+ SqlNode parse =
+ planner.parse("select \"j\", count(*) from \"beatles\" group by \"j\"");
+
+ SqlNode validate = planner.validate(parse);
+ RelNode convert = planner.convert(validate);
+
+ final Interpreter interpreter =
+ new Interpreter(new MyDataContext(planner), convert);
+ assertRowsUnordered(interpreter,
+ "[George, 1]",
+ "[Paul, 1]",
+ "[John, 1]",
+ "[Ringo, 1]");
+ }
+
/** Tests executing a plan on a single-column
* {@link org.apache.calcite.schema.ScannableTable} using an interpreter. */
@Test public void testInterpretSimpleScannableTable() throws Exception {