You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/05/13 00:05:38 UTC
[1/3] phoenix git commit: PHOENIX-2818 Optimize ORDERED group by
Repository: phoenix
Updated Branches:
refs/heads/master d97eb4967 -> ae14e38cc
PHOENIX-2818 Optimize ORDERED group by
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d414505d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d414505d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d414505d
Branch: refs/heads/master
Commit: d414505df8afeba553734db688547c6a4e9e90d9
Parents: d97eb49
Author: James Taylor <ja...@apache.org>
Authored: Thu May 12 14:29:29 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 12 14:29:29 2016 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/GroupByCaseIT.java | 92 +++++++++
.../apache/phoenix/execute/AggregatePlan.java | 3 +-
.../iterate/AggregatingResultIterator.java | 4 +-
.../BaseGroupedAggregatingResultIterator.java | 3 +-
.../DistinctAggregatingResultIterator.java | 15 +-
.../FilterAggregatingResultIterator.java | 8 +-
.../OrderedAggregatingResultIterator.java | 5 +-
.../RowKeyOrderedAggregateResultIterator.java | 190 +++++++++++++++++++
.../iterate/AggregateResultScannerTest.java | 109 +----------
.../iterate/ConcatResultIteratorTest.java | 31 +--
.../iterate/MaterializedResultIterators.java | 66 +++++++
...owKeyOrderedAggregateResultIteratorTest.java | 149 +++++++++++++++
.../java/org/apache/phoenix/util/TestUtil.java | 70 +++++++
13 files changed, 596 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
index 44f43b7..b0524da 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java
@@ -32,6 +32,9 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.junit.Test;
@@ -343,4 +346,93 @@ public class GroupByCaseIT extends BaseHBaseManagedTimeIT {
" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs));
}
+ @Test
+ public void testSumGroupByOrderPreservingDesc() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("CREATE TABLE GROUP_BY_DESC (k1 char(1) not null, k2 integer not null, constraint pk primary key (k1,k2)) split on (?,?,?)");
+ stmt.setBytes(1, ByteUtil.concat(PChar.INSTANCE.toBytes("a"), PInteger.INSTANCE.toBytes(3)));
+ stmt.setBytes(2, ByteUtil.concat(PChar.INSTANCE.toBytes("j"), PInteger.INSTANCE.toBytes(3)));
+ stmt.setBytes(3, ByteUtil.concat(PChar.INSTANCE.toBytes("n"), PInteger.INSTANCE.toBytes(3)));
+ stmt.execute();
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 1)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 2)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 3)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 4)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('b', 5)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 1)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 2)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 3)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 4)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 1)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 3)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 4)");
+ conn.commit();
+ String query = "SELECT k1,sum(k2) FROM GROUP_BY_DESC GROUP BY k1 ORDER BY k1 DESC";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("n", rs.getString(1));
+ assertEquals(10, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("j", rs.getString(1));
+ assertEquals(10, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertEquals(5, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals(10, rs.getInt(2));
+ assertFalse(rs.next());
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertEquals(
+ "CLIENT PARALLEL 1-WAY REVERSE FULL SCAN OVER GROUP_BY_DESC\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs));
+ }
+
+ @Test
+ public void testAvgGroupByOrderPreserving() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("CREATE TABLE GROUP_BY_DESC (k1 char(1) not null, k2 integer not null, constraint pk primary key (k1,k2)) split on (?,?,?)");
+ stmt.setBytes(1, ByteUtil.concat(PChar.INSTANCE.toBytes("a"), PInteger.INSTANCE.toBytes(3)));
+ stmt.setBytes(2, ByteUtil.concat(PChar.INSTANCE.toBytes("j"), PInteger.INSTANCE.toBytes(3)));
+ stmt.setBytes(3, ByteUtil.concat(PChar.INSTANCE.toBytes("n"), PInteger.INSTANCE.toBytes(3)));
+ stmt.execute();
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 1)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 2)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 3)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('a', 6)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('b', 5)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 1)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 2)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 3)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('j', 10)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 1)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 3)");
+ conn.createStatement().execute("UPSERT INTO GROUP_BY_DESC VALUES('n', 2)");
+ conn.commit();
+ String query = "SELECT k1,avg(k2) FROM GROUP_BY_DESC GROUP BY k1";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals(3, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertEquals(5, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("j", rs.getString(1));
+ assertEquals(4, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("n", rs.getString(1));
+ assertEquals(2, rs.getInt(2));
+ assertFalse(rs.next());
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertEquals(
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER GROUP_BY_DESC\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [K1]", QueryUtil.getExplainPlan(rs));
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 94d1fc8..770cf71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator;
import org.apache.phoenix.iterate.LimitingResultIterator;
import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
import org.apache.phoenix.iterate.OffsetResultIterator;
@@ -226,7 +227,7 @@ public class AggregatePlan extends BaseQueryPlan {
aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators);
// If salted or local index we still need a merge sort as we'll potentially have multiple group by keys that aren't contiguous.
} else if (groupBy.isOrderPreserving() && !(this.getTableRef().getTable().getBucketNum() != null || this.getTableRef().getTable().getIndexType() == IndexType.LOCAL)) {
- aggResultIterator = new GroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators);
+ aggResultIterator = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
} else {
aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
index abd0545..59a89ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.iterate;
+import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -33,6 +34,7 @@ public interface AggregatingResultIterator extends ResultIterator {
* Provides a means of re-aggregating a result row. For
* scanners that need to look ahead (i.e. {@link org.apache.phoenix.iterate.OrderedAggregatingResultIterator}
* @param result the row to re-aggregate
+ * @return Aggregator[] results
*/
- void aggregate(Tuple result);
+ Aggregator[] aggregate(Tuple result);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
index 8fd36b3..84d29ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
@@ -91,10 +91,11 @@ public abstract class BaseGroupedAggregatingResultIterator implements
}
@Override
- public void aggregate(Tuple result) {
+ public Aggregator[] aggregate(Tuple result) {
Aggregator[] rowAggregators = aggregators.getAggregators();
aggregators.reset(rowAggregators);
aggregators.aggregate(rowAggregators, result);
+ return rowAggregators;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
index 1ba134b..cf3fb38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
@@ -18,17 +18,20 @@
package org.apache.phoenix.iterate;
import java.sql.SQLException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.schema.tuple.Tuple;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
/**
* Result scanner that dedups the incoming tuples to make them distinct.
* <p>
@@ -155,8 +158,8 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera
}
@Override
- public void aggregate(Tuple result) {
- delegate.aggregate(result);
+ public Aggregator[] aggregate(Tuple result) {
+ return delegate.aggregate(result);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
index 4fa2011..5fd2028 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
@@ -21,10 +21,10 @@ import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBoolean;
/**
@@ -66,8 +66,8 @@ public class FilterAggregatingResultIterator implements AggregatingResultIterat
}
@Override
- public void aggregate(Tuple result) {
- delegate.aggregate(result);
+ public Aggregator[] aggregate(Tuple result) {
+ return delegate.aggregate(result);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
index da2be48..51a7dd8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.List;
import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -55,7 +56,7 @@ public class OrderedAggregatingResultIterator extends OrderedResultIterator impl
}
@Override
- public void aggregate(Tuple result) {
- getDelegate().aggregate(result);
+ public Aggregator[] aggregate(Tuple result) {
+ return getDelegate().aggregate(result);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
new file mode 100644
index 0000000..3c52e51
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIterator.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ *
+ * Client-side aggregate for key ordered aggregation. Prevents the comparison of
+ * row keys for rows returned unless we cross a scan boundary.
+ *
+ */
+public class RowKeyOrderedAggregateResultIterator extends LookAheadResultIterator implements AggregatingResultIterator {
+ private final ResultIterators resultIterators;
+ private List<PeekingResultIterator> iterators;
+ private final Aggregators aggregators;
+ private final ImmutableBytesWritable currentKey = new ImmutableBytesWritable();
+ private final ImmutableBytesWritable previousKey = new ImmutableBytesWritable();
+ private boolean traversedIterator = true;
+ private boolean nextTraversedIterators;
+ private Tuple next;
+
+ private int index;
+
+ public RowKeyOrderedAggregateResultIterator(ResultIterators iterators, Aggregators aggregators) {
+ this.resultIterators = iterators;
+ this.aggregators = aggregators;
+ }
+
+ private List<PeekingResultIterator> getIterators() throws SQLException {
+ if (iterators == null && resultIterators != null) {
+ iterators = resultIterators.getIterators();
+ }
+ return iterators;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ SQLException toThrow = null;
+ try {
+ if (resultIterators != null) {
+ resultIterators.close();
+ }
+ } catch (Exception e) {
+ toThrow = ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ if (iterators != null) {
+ for (;index < iterators.size(); index++) {
+ PeekingResultIterator iterator = iterators.get(index);
+ try {
+ iterator.close();
+ } catch (Exception e) {
+ if (toThrow == null) {
+ toThrow = ServerUtil.parseServerException(e);
+ } else {
+ toThrow.setNextException(ServerUtil.parseServerException(e));
+ }
+ }
+ }
+ }
+ } finally {
+ if (toThrow != null) {
+ throw toThrow;
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void explain(List<String> planSteps) {
+ if (resultIterators != null) {
+ resultIterators.explain(planSteps);
+ }
+ }
+
+ private Tuple nextTuple() throws SQLException {
+ List<PeekingResultIterator> iterators = getIterators();
+ while (index < iterators.size()) {
+ PeekingResultIterator iterator = iterators.get(index);
+ Tuple r = iterator.peek();
+ if (r != null) {
+ return iterator.next();
+ }
+ traversedIterator = true;
+ iterator.close();
+ index++;
+ }
+ return null;
+ }
+
+ private boolean continueAggregating(Tuple previous, Tuple next) {
+ if (next == null) {
+ return false;
+ }
+ next.getKey(currentKey);
+ previous.getKey(previousKey);
+ return (currentKey.compareTo(previousKey) == 0);
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple t = super.next();
+ if (t == null) {
+ return null;
+ }
+ aggregate(t);
+ return t;
+ }
+
+ @Override
+ protected Tuple advance() throws SQLException {
+ Tuple current = this.next;
+ boolean traversedIterators = nextTraversedIterators;
+ if (current == null) {
+ current = nextTuple();
+ traversedIterators = this.traversedIterator;
+ }
+ if (current != null) {
+ Tuple previous = current;
+ Aggregator[] rowAggregators = null;
+ while (true) {
+ current = nextTuple();
+ if (!traversedIterators || !continueAggregating(previous, current)) {
+ break;
+ }
+ if (rowAggregators == null) {
+ rowAggregators = aggregate(previous);
+ }
+ aggregators.aggregate(rowAggregators, current);
+ traversedIterators = this.traversedIterator;
+ }
+ this.next = current;
+ this.nextTraversedIterators = this.traversedIterator;
+ if (rowAggregators == null) {
+ current = previous;
+ } else {
+ byte[] value = aggregators.toBytes(rowAggregators);
+ current = new SingleKeyValueTuple(KeyValueUtil.newKeyValue(previousKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ }
+ }
+ if (current == null) {
+ close(); // Close underlying ResultIterators to free resources sooner rather than later
+ }
+ return current;
+ }
+
+ @Override
+ public String toString() {
+ return "RowKeyOrderedAggregateResultIterator [resultIterators=" + resultIterators + ", index=" + index + "]";
+ }
+
+ @Override
+ public Aggregator[] aggregate(Tuple result) {
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ aggregators.aggregate(rowAggregators, result);
+ return rowAggregators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index f53e871..791eb23 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -19,40 +19,22 @@ package org.apache.phoenix.iterate;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import java.sql.DriverManager;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.AggregationManager;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.aggregator.ClientAggregators;
-import org.apache.phoenix.expression.function.SingleAggregateFunction;
-import org.apache.phoenix.expression.function.SumAggregateFunction;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.schema.PLongColumn;
-import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.AssertResults;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -87,90 +69,9 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest {
new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, PLong.INSTANCE.toBytes(2L))),
};
- PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
- PhoenixStatement statement = new PhoenixStatement(pconn);
- StatementContext context = new StatementContext(statement, null, new Scan(), new SequenceManager(statement));
- AggregationManager aggregationManager = context.getAggregationManager();
- SumAggregateFunction func = new SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new PLongColumn() {
- @Override
- public PName getName() {
- return SINGLE_COLUMN_NAME;
- }
- @Override
- public PName getFamilyName() {
- return SINGLE_COLUMN_FAMILY_NAME;
- }
- @Override
- public int getPosition() {
- return 0;
- }
-
- @Override
- public SortOrder getSortOrder() {
- return SortOrder.getDefault();
- }
-
- @Override
- public Integer getArraySize() {
- return 0;
- }
-
- @Override
- public byte[] getViewConstant() {
- return null;
- }
-
- @Override
- public boolean isViewReferenced() {
- return false;
- }
-
- @Override
- public String getExpressionStr() {
- return null;
- }
- @Override
- public boolean isRowTimestamp() {
- return false;
- }
- @Override
- public boolean isDynamic() {
- return false;
- }
- })), null);
- aggregationManager.setAggregators(new ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1));
- ResultIterators iterators = new ResultIterators() {
-
- @Override
- public List<PeekingResultIterator> getIterators() throws SQLException {
- return results;
- }
-
- @Override
- public int size() {
- return results.size();
- }
-
- @Override
- public void explain(List<String> planSteps) {
- }
-
- @Override
- public List<KeyRange> getSplits() {
- return Collections.emptyList();
- }
-
- @Override
- public List<List<Scan>> getScans() {
- return Collections.emptyList();
- }
-
- @Override
- public void close() throws SQLException {
- }
-
- };
- ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());
+ ResultIterators iterators = new MaterializedResultIterators(results);
+ ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators);
AssertResults.assertResults(scanner, expectedResults);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index cf71724..67d5cd0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -56,36 +56,7 @@ public class ConcatResultIteratorTest {
new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4))),
};
final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
- ResultIterators iterators = new ResultIterators() {
-
- @Override
- public List<PeekingResultIterator> getIterators() throws SQLException {
- return results;
- }
-
- @Override
- public int size() {
- return results.size();
- }
-
- @Override
- public void explain(List<String> planSteps) {
- }
-
- @Override
- public List<KeyRange> getSplits() {
- return Collections.emptyList();
- }
-
- @Override
- public List<List<Scan>> getScans() {
- return Collections.emptyList();
- }
-
- @Override
- public void close() throws SQLException {
- }
- };
+ ResultIterators iterators = new MaterializedResultIterators(results);
Tuple[] expectedResults = new Tuple[] {
new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
new file mode 100644
index 0000000..c4b0265
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MaterializedResultIterators.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.query.KeyRange;
+
+/**
+ *
+ * ResultIteraors implementation backed by in-memory list of PeekingResultIterator
+ *
+ */
+public class MaterializedResultIterators implements ResultIterators {
+ private final List<PeekingResultIterator> results;
+
+ public MaterializedResultIterators(List<PeekingResultIterator> results) {
+ this.results = results;
+ }
+
+ @Override
+ public List<PeekingResultIterator> getIterators() throws SQLException {
+ return results;
+ }
+
+ @Override
+ public int size() {
+ return results.size();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ }
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
new file mode 100644
index 0000000..347de78
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/RowKeyOrderedAggregateResultIteratorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.AssertResults;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+public class RowKeyOrderedAggregateResultIteratorTest extends BaseConnectionlessQueryTest {
+ private final static byte[] A = Bytes.toBytes("a");
+ private final static byte[] B = Bytes.toBytes("b");
+ private final static byte[] C = Bytes.toBytes("c");
+ private final static byte[] D = Bytes.toBytes("d");
+
+ @Test
+ public void testNoSpan() throws Exception {
+ Tuple[] results1 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+ };
+ Tuple[] results2 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L)))
+ };
+ Tuple[] results3 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+ new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+ };
+ final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
+ ResultIterators iterators = new MaterializedResultIterators(results);
+
+ Tuple[] expectedResults = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))),
+ new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+ new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+ };
+
+ ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+ AssertResults.assertResults(scanner, expectedResults);
+ }
+
+ @Test
+ public void testSpanThree() throws Exception {
+ Tuple[] results1 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L)))
+ };
+ Tuple[] results2 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L)))
+ };
+ Tuple[] results3 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+ new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))),
+ };
+ final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
+ ResultIterators iterators = new MaterializedResultIterators(results);
+
+ Tuple[] expectedResults = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(9L))),
+ new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))),
+ };
+
+ ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+ AssertResults.assertResults(scanner, expectedResults);
+ }
+
+ @Test
+ public void testSpanAll() throws Exception {
+ Tuple[] results1 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L)))
+ };
+ Tuple[] results2 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L)))
+ };
+ Tuple[] results3 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+ };
+ final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
+ ResultIterators iterators = new MaterializedResultIterators(results);
+
+ Tuple[] expectedResults = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(9L))),
+ };
+
+ ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+ AssertResults.assertResults(scanner, expectedResults);
+ }
+
+ @Test
+ public void testSpanEnd() throws Exception {
+ Tuple[] results1 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1L))),
+ };
+ Tuple[] results2 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(2L))),
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+ new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+ new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(5L))),
+ };
+ Tuple[] results3 = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(6L))),
+ };
+ final List<PeekingResultIterator>results = Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))});
+ ResultIterators iterators = new MaterializedResultIterators(results);
+
+ Tuple[] expectedResults = new Tuple[] {
+ new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+ new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(3L))),
+ new SingleKeyValueTuple(new KeyValue(C, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(4L))),
+ new SingleKeyValueTuple(new KeyValue(D, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(11L))),
+ };
+
+ ClientAggregators aggregators = TestUtil.getSingleSumAggregator(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ ResultIterator scanner = new RowKeyOrderedAggregateResultIterator(iterators, aggregators);
+ AssertResults.assertResults(scanner, expectedResults);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d414505d/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c73c160..872555c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.util;
import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME;
import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
@@ -42,6 +44,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -55,6 +58,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.AggregationManager;
+import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
@@ -70,7 +75,10 @@ import org.apache.phoenix.expression.NotExpression;
import org.apache.phoenix.expression.OrExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.StringBasedLikeExpression;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
import org.apache.phoenix.expression.function.SubstrFunction;
+import org.apache.phoenix.expression.function.SumAggregateFunction;
import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.MultiKeyValueComparisonFilter;
import org.apache.phoenix.filter.RowKeyComparisonFilter;
@@ -79,13 +87,17 @@ import org.apache.phoenix.filter.SingleKeyValueComparisonFilter;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.LikeParseNode.LikeType;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PLongColumn;
+import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.PTableStats;
@@ -622,5 +634,63 @@ public class TestUtil {
tableNameBuilder.append(transactional ? "_TXN" : "_NON_TXN");
return tableNameBuilder.toString();
}
+
+ public static ClientAggregators getSingleSumAggregator(String url, Properties props) throws SQLException {
+ try (PhoenixConnection pconn = DriverManager.getConnection(url, props).unwrap(PhoenixConnection.class)) {
+ PhoenixStatement statement = new PhoenixStatement(pconn);
+ StatementContext context = new StatementContext(statement, null, new Scan(), new SequenceManager(statement));
+ AggregationManager aggregationManager = context.getAggregationManager();
+ SumAggregateFunction func = new SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new PLongColumn() {
+ @Override
+ public PName getName() {
+ return SINGLE_COLUMN_NAME;
+ }
+ @Override
+ public PName getFamilyName() {
+ return SINGLE_COLUMN_FAMILY_NAME;
+ }
+ @Override
+ public int getPosition() {
+ return 0;
+ }
+
+ @Override
+ public SortOrder getSortOrder() {
+ return SortOrder.getDefault();
+ }
+
+ @Override
+ public Integer getArraySize() {
+ return 0;
+ }
+
+ @Override
+ public byte[] getViewConstant() {
+ return null;
+ }
+
+ @Override
+ public boolean isViewReferenced() {
+ return false;
+ }
+
+ @Override
+ public String getExpressionStr() {
+ return null;
+ }
+ @Override
+ public boolean isRowTimestamp() {
+ return false;
+ }
+ @Override
+ public boolean isDynamic() {
+ return false;
+ }
+ })), null);
+ aggregationManager.setAggregators(new ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1));
+ ClientAggregators aggregators = aggregationManager.getAggregators();
+ return aggregators;
+ }
+ }
}
[2/3] phoenix git commit: PHOENIX-2876 Using aggregation function in
ORDER BY (Sergey Soldatov)
Posted by ja...@apache.org.
PHOENIX-2876 Using aggregation function in ORDER BY (Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/af8d3b65
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/af8d3b65
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/af8d3b65
Branch: refs/heads/master
Commit: af8d3b65c84fd57b91b99ff36de2194149c5a94e
Parents: d414505
Author: James Taylor <ja...@apache.org>
Authored: Thu May 12 14:32:34 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 12 14:32:34 2016 -0700
----------------------------------------------------------------------
.../phoenix/compile/AggregationManager.java | 60 ++++++++++++++++++++
.../apache/phoenix/compile/DeleteCompiler.java | 1 +
.../apache/phoenix/compile/PostDDLCompiler.java | 1 +
.../phoenix/compile/ProjectionCompiler.java | 53 +----------------
.../apache/phoenix/compile/QueryCompiler.java | 1 +
.../apache/phoenix/compile/UpsertCompiler.java | 6 +-
.../phoenix/compile/QueryCompilerTest.java | 16 +++++-
7 files changed, 86 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
index ee2497b..c8e672e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
@@ -17,7 +17,21 @@
*/
package org.apache.phoenix.compile;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
/**
*
@@ -52,4 +66,50 @@ public class AggregationManager {
public void setAggregators(ClientAggregators clientAggregator) {
this.aggregators = clientAggregator;
}
+ /**
+ * Compiles projection by:
+ * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
+ * to track how many rows have been scanned.
+ * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
+ * optimize the positional access of the aggregated value.
+ */
+ public void compile(StatementContext context, GroupByCompiler.GroupBy groupBy) throws
+ SQLException {
+ final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
+
+ Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
+ while (expressions.hasNext()) {
+ Expression expression = expressions.next();
+ expression.accept(new SingleAggregateFunctionVisitor() {
+ @Override
+ public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
+ aggFuncSet.add(function);
+ return Iterators.emptyIterator();
+ }
+ });
+ }
+ if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
+ return;
+ }
+ List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
+ Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
+
+ int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
+ context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
+ ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
+ context.getAggregationManager().setAggregators(clientAggregators);
+ }
+
+ private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
+ int minNullableIndex = aggFuncs.size();
+ for (int i = 0; i < aggFuncs.size(); i++) {
+ SingleAggregateFunction aggFunc = aggFuncs.get(i);
+ if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
+ minNullableIndex = i;
+ break;
+ }
+ }
+ return minNullableIndex;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 2a97686..fa3dd62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -508,6 +508,7 @@ public class DeleteCompiler {
// Ignoring ORDER BY, since with auto commit on and no limit makes no difference
SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+ context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
if (plan.getProjector().projectEveryRow()) {
projectorToBe = new RowProjector(projectorToBe,true);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index a786438..e43b596 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -253,6 +253,7 @@ public class PostDDLCompiler {
}
// Need to project all column families into the scan, since we haven't yet created our empty key value
RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
+ context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
// Explicitly project these column families and don't project the empty key value,
// since at this point we haven't added the empty key value everywhere.
if (columnFamilies != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 3cf3934..8d7d7cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -188,8 +188,8 @@ public class ProjectionCompiler {
try {
dataTable = conn.getTable(new PTableKey(tenantId, tableName));
} catch (TableNotFoundException e) {
- if (tenantId != null) {
- // Check with null tenantId
+ if (tenantId != null) {
+ // Check with null tenantId
dataTable = conn.getTable(new PTableKey(null, tableName));
}
else {
@@ -483,8 +483,6 @@ public class ProjectionCompiler {
}
}
}
-
- selectVisitor.compile();
boolean isProjectEmptyKeyValue = false;
if (isWildcard) {
projectAllColumnFamilies(table, scan);
@@ -576,18 +574,7 @@ public class ProjectionCompiler {
}
private static class SelectClauseVisitor extends ExpressionCompiler {
- private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
- int minNullableIndex = aggFuncs.size();
- for (int i = 0; i < aggFuncs.size(); i++) {
- SingleAggregateFunction aggFunc = aggFuncs.get(i);
- if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
- minNullableIndex = i;
- break;
- }
- }
- return minNullableIndex;
- }
-
+
/**
* Track whether or not the projection expression is case sensitive. We use this
* information to determine whether or not we normalize the column name passed
@@ -613,40 +600,6 @@ public class ProjectionCompiler {
reset();
}
-
- /**
- * Compiles projection by:
- * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
- * to track how many rows have been scanned.
- * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
- * optimize the positional access of the aggregated value.
- */
- private void compile() throws SQLException {
- final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
-
- Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
- while (expressions.hasNext()) {
- Expression expression = expressions.next();
- expression.accept(new SingleAggregateFunctionVisitor() {
- @Override
- public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
- aggFuncSet.add(function);
- return Iterators.emptyIterator();
- }
- });
- }
- if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
- return;
- }
- List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
- Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
-
- int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
- context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
- ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
- context.getAggregationManager().setAggregators(clientAggregators);
- }
-
@Override
public void reset() {
super.reset();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 82c9731..4488aff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -564,6 +564,7 @@ public class QueryCompiler {
RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where);
OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, offset, projector,
groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder);
+ context.getAggregationManager().compile(context, groupBy);
// Final step is to build the query plan
if (!asSubquery) {
int maxRows = statement.getMaxRows();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7c6347f..e2fc2ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -636,7 +636,11 @@ public class UpsertCompiler {
PTable projectedTable = PTableImpl.makePTable(table, projectedColumns);
SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
- RowProjector aggProjectorToBe = ProjectionCompiler.compile(queryPlan.getContext(), select, GroupBy.EMPTY_GROUP_BY);
+ StatementContext statementContext = queryPlan.getContext();
+ RowProjector aggProjectorToBe = ProjectionCompiler.compile(statementContext, select, GroupBy
+ .EMPTY_GROUP_BY);
+ statementContext.getAggregationManager().compile(queryPlan.getContext()
+ ,GroupBy.EMPTY_GROUP_BY);
if (queryPlan.getProjector().projectEveryRow()) {
aggProjectorToBe = new RowProjector(aggProjectorToBe,true);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 4b756fa..1db90a9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -2275,5 +2275,19 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
conn.close();
}
}
-
+ @Test
+ public void testOrderByWithNoProjection() throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.createStatement().execute("create table x (id integer primary key, A.i1 integer," +
+ " B.i2 integer)");
+ Scan scan = projectQuery("select A.i1 from X group by i1 order by avg(B.i2) " +
+ "desc");
+ ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute
+ (BaseScannerRegionObserver.AGGREGATORS), null);
+ assertEquals(2,aggregators.getAggregatorCount());
+ } finally {
+ conn.close();
+ }
+ }
}
[3/3] phoenix git commit: PHOENIX-2888 Aggregations for similar IN
list expressions doesn't work correctly (Sergey Soldatov)
Posted by ja...@apache.org.
PHOENIX-2888 Aggregations for similar IN list expressions doesn't work correctly (Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ae14e38c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ae14e38c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ae14e38c
Branch: refs/heads/master
Commit: ae14e38ccbed601dfbda9ec0dbb66bc88d63bae5
Parents: af8d3b6
Author: James Taylor <ja...@apache.org>
Authored: Thu May 12 14:34:47 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 12 14:34:47 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/phoenix/expression/InListExpression.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae14e38c/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
index a4a9353..a977f1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
@@ -175,7 +175,7 @@ public class InListExpression extends BaseSingleExpression {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + values.hashCode();
+ result = prime * result + children.hashCode() + values.hashCode();
return result;
}
@@ -185,7 +185,7 @@ public class InListExpression extends BaseSingleExpression {
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
InListExpression other = (InListExpression)obj;
- if (!values.equals(other.values)) return false;
+ if (!children.equals(other.children) || !values.equals(other.values)) return false;
return true;
}