You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/10/14 23:50:17 UTC
git commit: PHOENIX-943 Handle pushed down post-filters for subquery
in joins with limit and non-groupby aggregation
Repository: phoenix
Updated Branches:
refs/heads/master 34bfc63c9 -> 8ece1a74a
PHOENIX-943 Handle pushed down post-filters for subquery in joins with limit and non-groupby aggregation
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ece1a74
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ece1a74
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ece1a74
Branch: refs/heads/master
Commit: 8ece1a74afae2112b4959147b19fbf0e215b8de7
Parents: 34bfc63
Author: maryannxue <ma...@apache.org>
Authored: Tue Oct 14 17:49:53 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Tue Oct 14 17:49:53 2014 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/HashJoinIT.java | 49 +++++++++
.../apache/phoenix/compile/JoinCompiler.java | 64 +++++++----
.../apache/phoenix/compile/QueryCompiler.java | 14 ++-
.../phoenix/compile/StatementContext.java | 12 +--
.../phoenix/compile/SubselectRewriter.java | 11 +-
.../phoenix/execute/DelegateQueryPlan.java | 105 ++++++++++++++++++
.../apache/phoenix/execute/HashJoinPlan.java | 106 +++----------------
.../phoenix/execute/TupleProjectionPlan.java | 100 +++++++++++++++++
.../apache/phoenix/join/HashCacheClient.java | 9 +-
.../org/apache/phoenix/join/TupleProjector.java | 6 ++
10 files changed, 334 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 8e35216..da8f447 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -41,6 +41,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Collection;
@@ -3379,6 +3380,36 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
}
@Test
+ public void testJoinWithSubqueryPostFilters() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " LIMIT 5) AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name != 'S1')";
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000003");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000004");
+ assertEquals(rs.getString(2), "T4");
+ assertEquals(rs.getString(3), "0000000002");
+ assertEquals(rs.getString(4), "S2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "0000000005");
+ assertEquals(rs.getString(2), "T5");
+ assertEquals(rs.getString(3), "0000000005");
+ assertEquals(rs.getString(4), "S5");
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
public void testJoinWithSubqueryAndAggregation() throws Exception {
String query1 = "SELECT i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN (SELECT name, \"item_id\" iid FROM "
+ JOIN_ITEM_TABLE_FULL_NAME + ") AS i ON o.\"item_id\" = i.iid GROUP BY i.name ORDER BY i.name";
@@ -3796,5 +3827,23 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
}
}
+ @Test
+ public void testUnsupportedJoinConditions() throws Exception {
+ String query = "SELECT * FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON (item.\"supplier_id\" || supp.\"supplier_id\") = ''";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ try {
+ statement.executeQuery();
+ fail("Should have got SQLFeatureNotSupportedException.");
+ } catch (SQLFeatureNotSupportedException e) {
+ assertEquals("Does not support non-standard or non-equi join conditions.", e.getMessage());
+ }
+ } finally {
+ conn.close();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 7e5382e..ad1c6a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -313,6 +313,9 @@ public class JoinCompiler {
for (ParseNode node : table.getPreFilters()) {
node.accept(prefilterRefVisitor);
}
+ for (ParseNode node : table.getPostFilters()) {
+ node.accept(generalRefVisitor);
+ }
for (ParseNode node : postFilters) {
node.accept(generalRefVisitor);
}
@@ -331,22 +334,12 @@ public class JoinCompiler {
}
}
- public Expression compilePostFilterExpression(StatementContext context) throws SQLException {
- if (postFilters.isEmpty())
- return null;
-
- ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
- List<Expression> expressions = new ArrayList<Expression>(postFilters.size());
- for (ParseNode postFilter : postFilters) {
- expressionCompiler.reset();
- Expression expression = postFilter.accept(expressionCompiler);
- expressions.add(expression);
+ public Expression compilePostFilterExpression(StatementContext context, Table table) throws SQLException {
+ List<ParseNode> filtersCombined = Lists.<ParseNode> newArrayList(postFilters);
+ if (table != null) {
+ filtersCombined.addAll(table.getPostFilters());
}
-
- if (expressions.size() == 1)
- return expressions.get(0);
-
- return AndExpression.create(expressions);
+ return JoinCompiler.compilePostFilterExpression(context, filtersCombined);
}
/**
@@ -450,6 +443,9 @@ public class JoinCompiler {
this.dependencies = new HashSet<TableRef>();
OnNodeVisitor visitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable);
onNode.accept(visitor);
+ if (onConditions.isEmpty()) {
+ visitor.throwUnsupportedJoinConditionException();
+ }
}
public JoinType getType() {
@@ -609,6 +605,7 @@ public class JoinCompiler {
private final List<AliasedNode> selectNodes; // all basic nodes related to this table, no aggregation.
private final List<ParseNode> preFilters;
private final List<ParseNode> postFilters;
+ private final boolean isPostFilterConvertible;
private Table(TableNode tableNode, List<ColumnDef> dynamicColumns,
List<AliasedNode> selectNodes, TableRef tableRef) {
@@ -619,6 +616,7 @@ public class JoinCompiler {
this.selectNodes = selectNodes;
this.preFilters = new ArrayList<ParseNode>();
this.postFilters = Collections.<ParseNode>emptyList();
+ this.isPostFilterConvertible = false;
}
private Table(DerivedTableNode tableNode,
@@ -628,8 +626,9 @@ public class JoinCompiler {
this.subselect = SubselectRewriter.flatten(tableNode.getSelect(), statement.getConnection());
this.tableRef = tableRef;
this.selectNodes = selectNodes;
- this.preFilters = Collections.<ParseNode>emptyList();
+ this.preFilters = new ArrayList<ParseNode>();
this.postFilters = new ArrayList<ParseNode>();
+ this.isPostFilterConvertible = SubselectRewriter.isPostFilterConvertible(subselect);
}
public TableNode getTableNode() {
@@ -661,21 +660,24 @@ public class JoinCompiler {
}
public void addFilter(ParseNode filter) {
- if (!isSubselect()) {
+ if (!isSubselect() || isPostFilterConvertible) {
preFilters.add(filter);
- return;
+ } else {
+ postFilters.add(filter);
}
-
- postFilters.add(filter);
}
public ParseNode getPreFiltersCombined() {
return combine(preFilters);
}
+ public Expression compilePostFilterExpression(StatementContext context) throws SQLException {
+ return JoinCompiler.compilePostFilterExpression(context, postFilters);
+ }
+
public SelectStatement getAsSubquery() throws SQLException {
if (isSubselect())
- return SubselectRewriter.applyPostFilters(subselect, postFilters, tableNode.getAlias());
+ return SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias());
List<TableNode> from = Collections.<TableNode>singletonList(tableNode);
return NODE_FACTORY.select(from, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, null, null, 0, false, select.hasSequence());
@@ -970,7 +972,7 @@ public class JoinCompiler {
* 2) a boolean condition referencing to the self table only.
* Otherwise, it can be ambiguous.
*/
- private void throwUnsupportedJoinConditionException()
+ public void throwUnsupportedJoinConditionException()
throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException("Does not support non-standard or non-equi join conditions.");
}
@@ -1093,6 +1095,24 @@ public class JoinCompiler {
return ret;
}
+ private static Expression compilePostFilterExpression(StatementContext context, List<ParseNode> postFilters) throws SQLException {
+ if (postFilters.isEmpty())
+ return null;
+
+ ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+ List<Expression> expressions = new ArrayList<Expression>(postFilters.size());
+ for (ParseNode postFilter : postFilters) {
+ expressionCompiler.reset();
+ Expression expression = postFilter.accept(expressionCompiler);
+ expressions.add(expression);
+ }
+
+ if (expressions.size() == 1)
+ return expressions.get(0);
+
+ return AndExpression.create(expressions);
+ }
+
public static SelectStatement optimize(PhoenixStatement statement, SelectStatement select, final ColumnResolver resolver) throws SQLException {
TableRef groupByTableRef = null;
TableRef orderByTableRef = null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index a2dc5b3..d82ac02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan;
import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -157,8 +158,7 @@ public class QueryCompiler {
QueryPlan plan = compileSubquery(subquery);
ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector());
context.setResolver(projectedTable.createColumnResolver());
- context.setClientTupleProjector(projectedTable.createTupleProjector());
- return plan;
+ return new TupleProjectionPlan(plan, projectedTable.createTupleProjector(), table.compilePostFilterExpression(context));
}
boolean[] starJoinVector = joinTable.getStarJoinVector();
@@ -197,7 +197,6 @@ public class QueryCompiler {
StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true);
ColumnResolver resolver = subContext.getResolver();
- TupleProjector clientProjector = subContext.getClientTupleProjector();
boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
if (hasPostReference) {
PTableWrapper subProjTable = ((JoinedTableColumnResolver) (resolver)).getPTableWrapper();
@@ -225,7 +224,7 @@ public class QueryCompiler {
if (i < count - 1) {
fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
}
- subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression, clientProjector, hasFilters);
+ subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression, hasFilters);
}
if (needsProject) {
TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector());
@@ -233,7 +232,7 @@ public class QueryCompiler {
context.setCurrentTable(tableRef);
context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, joinTable.isAllLeftJoin());
- Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
+ Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
Integer limit = null;
if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
limit = LimitCompiler.compile(context, query);
@@ -258,7 +257,6 @@ public class QueryCompiler {
StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true);
ColumnResolver lhsResolver = lhsCtx.getResolver();
- TupleProjector clientProjector = lhsCtx.getClientTupleProjector();
PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) (lhsResolver)).getPTableWrapper();
ProjectedPTableWrapper rhsProjTable;
TableRef rhsTableRef;
@@ -288,7 +286,7 @@ public class QueryCompiler {
context.setCurrentTable(rhsTableRef);
context.setResolver(projectedTable.createColumnResolver());
QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, type == JoinType.Right);
- Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
+ Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
Integer limit = null;
if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
limit = LimitCompiler.compile(context, rhs);
@@ -296,7 +294,7 @@ public class QueryCompiler {
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions);
- return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), clientProjector, lhsJoin.hasFilters())});
+ return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), lhsJoin.hasFilters())});
}
// Do not support queries like "A right join B left join C" with hash-joins.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index b0ba6f0..47ce3c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -73,7 +72,6 @@ public class StatementContext {
private TableRef currentTable;
private List<Pair<byte[], byte[]>> whereConditionColumns;
- private TupleProjector clientTupleProjector;
private TimeRange scanTimeRange = null;
private Map<SelectStatement, Object> subqueryResults;
@@ -85,7 +83,7 @@ public class StatementContext {
public StatementContext(PhoenixStatement statement, Scan scan) {
this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement));
}
-
+
public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) {
this.statement = statement;
this.resolver = resolver;
@@ -237,14 +235,6 @@ public class StatementContext {
public List<Pair<byte[], byte[]>> getWhereCoditionColumns() {
return whereConditionColumns;
}
-
- public TupleProjector getClientTupleProjector() {
- return clientTupleProjector;
- }
-
- public void setClientTupleProjector(TupleProjector projector) {
- this.clientTupleProjector = projector;
- }
public void setScanTimeRange(TimeRange value){
this.scanTimeRange = value;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index 3edcbc2..35ea900 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -19,7 +19,6 @@
package org.apache.phoenix.compile;
import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -50,15 +49,15 @@ public class SubselectRewriter extends ParseNodeRewriter {
if (postFilters.isEmpty())
return statement;
- // TODO Handle post-filters in the below two cases from JoinCompiler:
- // 1) select ... from A join (select id, b from T limit 10) as B on A.id = B.id where B.b = 'b'
- // 2) select ... from A join (select count(*) c from T) as B on A.a = B.c where B.c > 10
- if (statement.getLimit() != null || (statement.isAggregate() && statement.getGroupBy().isEmpty()))
- throw new SQLFeatureNotSupportedException();
+ assert(isPostFilterConvertible(statement));
return new SubselectRewriter(null, statement.getSelect(), subqueryAlias).applyPostFilters(statement, postFilters);
}
+ public static boolean isPostFilterConvertible(SelectStatement statement) throws SQLException {
+ return statement.getLimit() == null && (!statement.isAggregate() || !statement.getGroupBy().isEmpty());
+ }
+
public static SelectStatement flatten(SelectStatement select, PhoenixConnection connection) throws SQLException {
List<TableNode> from = select.getFrom();
while (from.size() == 1 && from.get(0) instanceof DerivedTableNode) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
new file mode 100644
index 0000000..4d50ba0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.ParameterMetaData;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.TableRef;
+
+public abstract class DelegateQueryPlan implements QueryPlan {
+ protected final QueryPlan delegate;
+
+ public DelegateQueryPlan(QueryPlan delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return delegate.getContext();
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return delegate.getParameterMetaData();
+ }
+
+ @Override
+ public long getEstimatedSize() {
+ return delegate.getEstimatedSize();
+ }
+
+ @Override
+ public TableRef getTableRef() {
+ return delegate.getTableRef();
+ }
+
+ @Override
+ public RowProjector getProjector() {
+ return delegate.getProjector();
+ }
+
+ @Override
+ public Integer getLimit() {
+ return delegate.getLimit();
+ }
+
+ @Override
+ public OrderBy getOrderBy() {
+ return delegate.getOrderBy();
+ }
+
+ @Override
+ public GroupBy getGroupBy() {
+ return delegate.getGroupBy();
+ }
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return delegate.getSplits();
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return delegate.getScans();
+ }
+
+ @Override
+ public FilterableStatement getStatement() {
+ return delegate.getStatement();
+ }
+
+ @Override
+ public boolean isDegenerate() {
+ return delegate.isDegenerate();
+ }
+
+ @Override
+ public boolean isRowKeyOrdered() {
+ return delegate.isRowKeyOrdered();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index bb3940c..fce4245 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.execute;
import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
-import java.sql.ParameterMetaData;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
@@ -38,8 +37,6 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.FromCompiler;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
@@ -59,31 +56,27 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PArrayDataType;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
import com.google.common.collect.Lists;
-public class HashJoinPlan implements QueryPlan {
+public class HashJoinPlan extends DelegateQueryPlan {
private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
private final FilterableStatement statement;
- private final BaseQueryPlan plan;
private final HashJoinInfo joinInfo;
private final SubPlan[] subPlans;
private final boolean recompileWhereClause;
@@ -98,7 +91,7 @@ public class HashJoinPlan implements QueryPlan {
public static HashJoinPlan create(FilterableStatement statement,
QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
if (plan instanceof BaseQueryPlan)
- return new HashJoinPlan(statement, (BaseQueryPlan) plan, joinInfo, subPlans, joinInfo == null);
+ return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null);
assert (plan instanceof HashJoinPlan);
HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
@@ -111,13 +104,13 @@ public class HashJoinPlan implements QueryPlan {
for (SubPlan subPlan : subPlans) {
mergedSubPlans[i++] = subPlan;
}
- return new HashJoinPlan(statement, hashJoinPlan.plan, joinInfo, mergedSubPlans, true);
+ return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans, true);
}
private HashJoinPlan(FilterableStatement statement,
- BaseQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
+ QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
+ super(plan);
this.statement = statement;
- this.plan = plan;
this.joinInfo = joinInfo;
this.subPlans = subPlans;
this.recompileWhereClause = recompileWhereClause;
@@ -126,21 +119,6 @@ public class HashJoinPlan implements QueryPlan {
}
@Override
- public Integer getLimit() {
- return plan.getLimit();
- }
-
- @Override
- public OrderBy getOrderBy() {
- return plan.getOrderBy();
- }
-
- @Override
- public RowProjector getProjector() {
- return plan.getProjector();
- }
-
- @Override
public ResultIterator iterator() throws SQLException {
int count = subPlans.length;
PhoenixConnection connection = getContext().getConnection();
@@ -149,7 +127,7 @@ public class HashJoinPlan implements QueryPlan {
List<Future<Object>> futures = Lists.<Future<Object>>newArrayListWithExpectedSize(count);
dependencies = Lists.newArrayList();
if (joinInfo != null) {
- hashClient = new HashCacheClient(plan.getContext().getConnection());
+ hashClient = new HashCacheClient(delegate.getContext().getConnection());
maxServerCacheTimeToLive = services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
firstJobEndTime = new AtomicLong(0);
keyRangeExpressions = new CopyOnWriteArrayList<Expression>();
@@ -194,24 +172,24 @@ public class HashJoinPlan implements QueryPlan {
boolean hasKeyRangeExpressions = keyRangeExpressions != null && !keyRangeExpressions.isEmpty();
if (recompileWhereClause || hasKeyRangeExpressions) {
- StatementContext context = plan.getContext();
+ StatementContext context = delegate.getContext();
PTable table = context.getCurrentTable().getTable();
ParseNode viewWhere = table.getViewStatement() == null ? null : new SQLParser(table.getViewStatement()).parseQuery().getWhere();
- context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (plan.getStatement()), plan.getContext().getConnection()));
+ context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (delegate.getStatement()), delegate.getContext().getConnection()));
if (recompileWhereClause) {
- WhereCompiler.compile(plan.getContext(), plan.getStatement(), viewWhere);
+ WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere);
}
if (hasKeyRangeExpressions) {
- WhereCompiler.compile(plan.getContext(), plan.getStatement(), viewWhere, keyRangeExpressions, true);
+ WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, true);
}
}
if (joinInfo != null) {
- Scan scan = plan.getContext().getScan();
+ Scan scan = delegate.getContext().getScan();
HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
}
- return plan.iterator(dependencies);
+ return ((BaseQueryPlan) delegate).iterator(dependencies);
}
private Expression createKeyRangeExpression(Expression lhsExpression,
@@ -254,18 +232,8 @@ public class HashJoinPlan implements QueryPlan {
}
@Override
- public long getEstimatedSize() {
- return plan.getEstimatedSize();
- }
-
- @Override
- public List<KeyRange> getSplits() {
- return plan.getSplits();
- }
-
- @Override
public ExplainPlan getExplainPlan() throws SQLException {
- List<String> planSteps = Lists.newArrayList(plan.getExplainPlan().getPlanSteps());
+ List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
int count = subPlans.length;
for (int i = 0; i < count; i++) {
planSteps.addAll(subPlans[i].getPreSteps(this));
@@ -285,35 +253,10 @@ public class HashJoinPlan implements QueryPlan {
}
@Override
- public ParameterMetaData getParameterMetaData() {
- return plan.getParameterMetaData();
- }
-
- @Override
- public StatementContext getContext() {
- return plan.getContext();
- }
-
- @Override
- public GroupBy getGroupBy() {
- return plan.getGroupBy();
- }
-
- @Override
- public TableRef getTableRef() {
- return plan.getTableRef();
- }
-
- @Override
public FilterableStatement getStatement() {
return statement;
}
- @Override
- public boolean isDegenerate() {
- return false;
- }
-
protected interface SubPlan {
public Object execute(HashJoinPlan parent) throws SQLException;
public void postProcess(Object result, HashJoinPlan parent) throws SQLException;
@@ -398,7 +341,6 @@ public class HashJoinPlan implements QueryPlan {
private final boolean singleValueOnly;
private final Expression keyRangeLhsExpression;
private final Expression keyRangeRhsExpression;
- private final TupleProjector clientProjector;
private final boolean hasFilters;
public HashSubPlan(int index, QueryPlan subPlan,
@@ -406,20 +348,19 @@ public class HashJoinPlan implements QueryPlan {
boolean singleValueOnly,
Expression keyRangeLhsExpression,
Expression keyRangeRhsExpression,
- TupleProjector clientProjector, boolean hasFilters) {
+ boolean hasFilters) {
this.index = index;
this.plan = subPlan;
this.hashExpressions = hashExpressions;
this.singleValueOnly = singleValueOnly;
this.keyRangeLhsExpression = keyRangeLhsExpression;
this.keyRangeRhsExpression = keyRangeRhsExpression;
- this.clientProjector = clientProjector;
this.hasFilters = hasFilters;
}
@Override
public Object execute(HashJoinPlan parent) throws SQLException {
- ScanRanges ranges = parent.plan.getContext().getScanRanges();
+ ScanRanges ranges = parent.delegate.getContext().getScanRanges();
List<ImmutableBytesWritable> keyRangeRhsValues = null;
if (keyRangeRhsExpression != null) {
keyRangeRhsValues = Lists.<ImmutableBytesWritable>newArrayList();
@@ -427,19 +368,16 @@ public class HashJoinPlan implements QueryPlan {
ServerCache cache = null;
if (hashExpressions != null) {
cache = parent.hashClient.addHashCache(ranges, plan.iterator(),
- clientProjector, plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
+ plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.delegate.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
long endTime = System.currentTimeMillis();
boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
if (!isSet && (endTime - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {
- LOG.warn(addCustomAnnotations("Hash plan [" + index + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", parent.plan.context.getConnection()));
+ LOG.warn(addCustomAnnotations("Hash plan [" + index + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", parent.delegate.getContext().getConnection()));
}
} else {
assert(keyRangeRhsExpression != null);
ResultIterator iterator = plan.iterator();
for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
- if (clientProjector != null) {
- result = clientProjector.projectResults(result);
- }
// Evaluate key expressions for hash join key range optimization.
ImmutableBytesWritable value = new ImmutableBytesWritable();
keyRangeRhsExpression.reset();
@@ -494,16 +432,6 @@ public class HashJoinPlan implements QueryPlan {
}
}
-
- @Override
- public boolean isRowKeyOrdered() {
- return plan.isRowKeyOrdered();
- }
-
- @Override
- public List<List<Scan>> getScans() {
- return plan.getScans();
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
new file mode 100644
index 0000000..410d386
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DelegateResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.join.TupleProjector;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+import com.google.common.collect.Lists;
+
+public class TupleProjectionPlan extends DelegateQueryPlan {
+ private final TupleProjector tupleProjector;
+ private final Expression postFilter;
+
+ public TupleProjectionPlan(QueryPlan plan, TupleProjector tupleProjector, Expression postFilter) {
+ super(plan);
+ if (tupleProjector == null) throw new IllegalArgumentException("tupleProjector is null");
+ this.tupleProjector = tupleProjector;
+ this.postFilter = postFilter;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
+ if (postFilter != null) {
+ planSteps.add("CLIENT FILTER BY " + postFilter.toString());
+ }
+
+ return new ExplainPlan(planSteps);
+ }
+
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+
+ return new DelegateResultIterator(delegate.iterator()) {
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple tuple = null;
+ while (tuple == null) {
+ tuple = super.next();
+ if (tuple == null) {
+ break;
+ }
+
+ tuple = tupleProjector.projectResults(tuple);
+
+ if (postFilter != null) {
+ postFilter.reset();
+ try {
+ if (postFilter.evaluate(tuple, tempPtr)) {
+ Boolean b = (Boolean)postFilter.getDataType().toObject(tempPtr);
+ if (!b.booleanValue()) {
+ tuple = null;
+ }
+ } else {
+ tuple = null;
+ }
+ } catch (IllegalDataException e) {
+ tuple = null;
+ }
+ }
+ }
+
+ return tuple;
+ }
+
+ @Override
+ public String toString() {
+ return "TupleProjectionResultIterator [projector=" + tupleProjector + ", postFilter="
+ + postFilter + "]";
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index b6245ac..5ea11b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -69,16 +69,16 @@ public class HashCacheClient {
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
- public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
+ public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- serialize(ptr, iterator, projector, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
+ serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
}
- private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
+ private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
estimatedSize = Math.min(estimatedSize, maxSize);
if (estimatedSize > Integer.MAX_VALUE) {
@@ -99,9 +99,6 @@ public class HashCacheClient {
int nRows = 0;
out.writeInt(nRows); // In the end will be replaced with total number of rows
for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
- if (projector != null) {
- result = projector.projectResults(result);
- }
TupleUtil.write(result, out);
if (baOut.size() > maxSize) {
throw new MaxServerCacheSizeExceededException("Size of hash cache (" + baOut.size() + " bytes) exceeds the maximum allowed size (" + maxSize + " bytes)");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
index 41b3906..e0d9336 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
@@ -250,5 +251,10 @@ public class TupleProjector {
public ValueBitSet getValueBitSet() {
return valueSet;
}
+
+ @Override
+ public String toString() {
+ return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}";
+ }
}