You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/05/13 00:05:39 UTC
[2/3] phoenix git commit: PHOENIX-2876 Using aggregation function in
ORDER BY (Sergey Soldatov)
PHOENIX-2876 Using aggregation function in ORDER BY (Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/af8d3b65
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/af8d3b65
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/af8d3b65
Branch: refs/heads/master
Commit: af8d3b65c84fd57b91b99ff36de2194149c5a94e
Parents: d414505
Author: James Taylor <ja...@apache.org>
Authored: Thu May 12 14:32:34 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 12 14:32:34 2016 -0700
----------------------------------------------------------------------
.../phoenix/compile/AggregationManager.java | 60 ++++++++++++++++++++
.../apache/phoenix/compile/DeleteCompiler.java | 1 +
.../apache/phoenix/compile/PostDDLCompiler.java | 1 +
.../phoenix/compile/ProjectionCompiler.java | 53 +----------------
.../apache/phoenix/compile/QueryCompiler.java | 1 +
.../apache/phoenix/compile/UpsertCompiler.java | 6 +-
.../phoenix/compile/QueryCompilerTest.java | 16 +++++-
7 files changed, 86 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
index ee2497b..c8e672e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
@@ -17,7 +17,21 @@
*/
package org.apache.phoenix.compile;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
/**
*
@@ -52,4 +66,50 @@ public class AggregationManager {
public void setAggregators(ClientAggregators clientAggregator) {
this.aggregators = clientAggregator;
}
+ /**
+ * Compiles projection by:
+ * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
+ * to track how many rows have been scanned.
+ * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
+ * optimize the positional access of the aggregated value.
+ */
+ public void compile(StatementContext context, GroupByCompiler.GroupBy groupBy) throws
+ SQLException {
+ final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
+
+ Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
+ while (expressions.hasNext()) {
+ Expression expression = expressions.next();
+ expression.accept(new SingleAggregateFunctionVisitor() {
+ @Override
+ public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
+ aggFuncSet.add(function);
+ return Iterators.emptyIterator();
+ }
+ });
+ }
+ if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
+ return;
+ }
+ List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
+ Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
+
+ int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
+ context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
+ ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
+ context.getAggregationManager().setAggregators(clientAggregators);
+ }
+
+ private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
+ int minNullableIndex = aggFuncs.size();
+ for (int i = 0; i < aggFuncs.size(); i++) {
+ SingleAggregateFunction aggFunc = aggFuncs.get(i);
+ if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
+ minNullableIndex = i;
+ break;
+ }
+ }
+ return minNullableIndex;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 2a97686..fa3dd62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -508,6 +508,7 @@ public class DeleteCompiler {
// Ignoring ORDER BY, since with auto commit on and no limit makes no difference
SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+ context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
if (plan.getProjector().projectEveryRow()) {
projectorToBe = new RowProjector(projectorToBe,true);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index a786438..e43b596 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -253,6 +253,7 @@ public class PostDDLCompiler {
}
// Need to project all column families into the scan, since we haven't yet created our empty key value
RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
+ context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
// Explicitly project these column families and don't project the empty key value,
// since at this point we haven't added the empty key value everywhere.
if (columnFamilies != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 3cf3934..8d7d7cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -188,8 +188,8 @@ public class ProjectionCompiler {
try {
dataTable = conn.getTable(new PTableKey(tenantId, tableName));
} catch (TableNotFoundException e) {
- if (tenantId != null) {
- // Check with null tenantId
+ if (tenantId != null) {
+ // Check with null tenantId
dataTable = conn.getTable(new PTableKey(null, tableName));
}
else {
@@ -483,8 +483,6 @@ public class ProjectionCompiler {
}
}
}
-
- selectVisitor.compile();
boolean isProjectEmptyKeyValue = false;
if (isWildcard) {
projectAllColumnFamilies(table, scan);
@@ -576,18 +574,7 @@ public class ProjectionCompiler {
}
private static class SelectClauseVisitor extends ExpressionCompiler {
- private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
- int minNullableIndex = aggFuncs.size();
- for (int i = 0; i < aggFuncs.size(); i++) {
- SingleAggregateFunction aggFunc = aggFuncs.get(i);
- if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
- minNullableIndex = i;
- break;
- }
- }
- return minNullableIndex;
- }
-
+
/**
* Track whether or not the projection expression is case sensitive. We use this
* information to determine whether or not we normalize the column name passed
@@ -613,40 +600,6 @@ public class ProjectionCompiler {
reset();
}
-
- /**
- * Compiles projection by:
- * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
- * to track how many rows have been scanned.
- * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
- * optimize the positional access of the aggregated value.
- */
- private void compile() throws SQLException {
- final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
-
- Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
- while (expressions.hasNext()) {
- Expression expression = expressions.next();
- expression.accept(new SingleAggregateFunctionVisitor() {
- @Override
- public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
- aggFuncSet.add(function);
- return Iterators.emptyIterator();
- }
- });
- }
- if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
- return;
- }
- List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
- Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
-
- int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
- context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
- ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
- context.getAggregationManager().setAggregators(clientAggregators);
- }
-
@Override
public void reset() {
super.reset();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 82c9731..4488aff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -564,6 +564,7 @@ public class QueryCompiler {
RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where);
OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, offset, projector,
groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder);
+ context.getAggregationManager().compile(context, groupBy);
// Final step is to build the query plan
if (!asSubquery) {
int maxRows = statement.getMaxRows();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7c6347f..e2fc2ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -636,7 +636,11 @@ public class UpsertCompiler {
PTable projectedTable = PTableImpl.makePTable(table, projectedColumns);
SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
- RowProjector aggProjectorToBe = ProjectionCompiler.compile(queryPlan.getContext(), select, GroupBy.EMPTY_GROUP_BY);
+ StatementContext statementContext = queryPlan.getContext();
+ RowProjector aggProjectorToBe = ProjectionCompiler.compile(statementContext, select, GroupBy
+ .EMPTY_GROUP_BY);
+ statementContext.getAggregationManager().compile(queryPlan.getContext()
+ ,GroupBy.EMPTY_GROUP_BY);
if (queryPlan.getProjector().projectEveryRow()) {
aggProjectorToBe = new RowProjector(aggProjectorToBe,true);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 4b756fa..1db90a9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -2275,5 +2275,19 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
conn.close();
}
}
-
+ @Test
+ public void testOrderByWithNoProjection() throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.createStatement().execute("create table x (id integer primary key, A.i1 integer," +
+ " B.i2 integer)");
+ Scan scan = projectQuery("select A.i1 from X group by i1 order by avg(B.i2) " +
+ "desc");
+ ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute
+ (BaseScannerRegionObserver.AGGREGATORS), null);
+ assertEquals(2,aggregators.getAggregatorCount());
+ } finally {
+ conn.close();
+ }
+ }
}