You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/12/14 04:58:15 UTC
[phoenix] branch 4.x updated: PHOENIX-6232 (addendum)Correlated
subquery should not push to RegionServer as the probe side of the Hash join
This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 4b272b1 PHOENIX-6232 (addendum)Correlated subquery should not push to RegionServer as the probe side of the Hash join
4b272b1 is described below
commit 4b272b106a239b4bc2dbbdaacca592a80f569bba
Author: chenglei <ch...@apache.org>
AuthorDate: Mon Dec 14 12:57:33 2020 +0800
PHOENIX-6232 (addendum)Correlated subquery should not push to RegionServer as the probe side of the Hash join
---
.../phoenix/end2end/join/HashJoinMoreIT.java | 12 ++++
.../org/apache/phoenix/compile/JoinCompiler.java | 81 ++++++++++------------
.../org/apache/phoenix/compile/QueryCompiler.java | 67 ++++++++++++++----
.../apache/phoenix/compile/QueryCompilerTest.java | 17 ++++-
4 files changed, 118 insertions(+), 59 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
index f0f411f..f83be66 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
@@ -950,6 +950,8 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (3,22)");
conn.commit();
+ //test for LHS is a flat table and pushed down NonCorrelated subquery as preFiter.
+ //would use HashJoin.
sql="select a.aid from " + tableName1 + " a inner join "+
"(select bid,code from " + tableName2 + " where code > 10 limit 3) b on a.aid = b.bid "+
"where a.age > (select code from " + tableName2 + " c where c.bid = 2) order by a.aid";
@@ -960,6 +962,8 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
assertTrue(rs.getInt(1) == 3);
assertTrue(!rs.next());
+ //test for LHS is a subselect and pushed down NonCorrelated subquery as preFiter.
+ //would use HashJoin.
sql = "select a.aid from (select aid,age from " + tableName1 + " where age >=11 and age<=33) a inner join "+
"(select bid,code from " + tableName2 + " where code > 10 limit 3) b on a.aid = b.bid "+
"where a.age > (select code from " + tableName2 + " c where c.bid = 2) order by a.aid";
@@ -970,12 +974,19 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
assertTrue(rs.getInt(1) == 3);
assertTrue(!rs.next());
+ //test for LHS is a subselect and pushed down aggregate NonCorrelated subquery as preFiter.
+ //would use HashJoin.
sql = "select a.aid from (select aid,age from " + tableName1 + " where age >=11 and age<=33) a inner join "+
"(select bid,code from " + tableName2 + " where code > 10 limit 3) b on a.aid = b.bid "+
"where a.age > (select max(code) from " + tableName2 + " c where c.bid >= 1) order by a.aid";
rs = conn.prepareStatement(sql).executeQuery();
assertTrue(!rs.next());
+ /**
+ * test for LHS is a subselect and has an aggregate Correlated subquery as preFiter,
+ * but the aggregate Correlated subquery would be rewrite as HashJoin before
+ * {@link JoinCompiler#compile}.
+ */
sql = "select a.aid from (select aid,age from " + tableName1 + " where age >=11 and age<=33) a inner join "+
"(select bid,code from " + tableName2 + " where code > 10 limit 3) b on a.aid = b.bid "+
"where a.age > (select max(code) from " + tableName2 + " c where c.bid = a.aid) order by a.aid";
@@ -998,6 +1009,7 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
conn.createStatement().execute("UPSERT INTO " + tableName3 + "(id,test_id,lastchanged) VALUES (2,100,'2011-11-11 11:11:11')");
conn.commit();
+ //test for LHS is Correlated subquery,the RHS would be as the probe side of Hash join.
sql= "SELECT AAA.* FROM " +
"(SELECT id, test_id, lastchanged FROM " + tableName3 + " T " +
" WHERE lastchanged = ( SELECT max(lastchanged) FROM " + tableName3 + " WHERE test_id = T.test_id )) AAA " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 46bf2a2..e22fdc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -38,6 +38,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.HashJoinRegionScanner;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -123,6 +124,9 @@ public class JoinCompiler {
}
private final PhoenixStatement phoenixStatement;
+ /**
+ * The original join sql for current {@link JoinCompiler}.
+ */
private final SelectStatement originalJoinSelectStatement;
private final ColumnResolver origResolver;
private final boolean useStarJoin;
@@ -314,7 +318,7 @@ public class JoinCompiler {
return allLeftJoin;
}
- public SelectStatement getStatement() {
+ public SelectStatement getOriginalJoinSelectStatement() {
return originalJoinSelectStatement;
}
@@ -785,7 +789,7 @@ public class JoinCompiler {
* Only make sense for {@link #isSubselect()}.
* {@link #postFilterParseNodes} could not as this
* {@link Table}'s where conditions, but need to filter after
- * {@link #getSelectStatementByApplyPreFiltersIfSubselect()}
+ * {@link #getSelectStatementByApplyPreFiltersForSubselect()}
* is executed.
*/
private final List<ParseNode> postFilterParseNodes;
@@ -963,14 +967,33 @@ public class JoinCompiler {
public SelectStatement getAsSubquery(List<OrderByNode> newOrderByNodes) throws SQLException {
if (isSubselect()) {
return SubselectRewriter.applyOrderByAndPostFilters(
- this.getSelectStatementByApplyPreFiltersIfSubselect(),
+ this.getSelectStatementByApplyPreFiltersForSubselect(),
newOrderByNodes,
tableNode.getAlias(),
postFilterParseNodes);
}
- //for flat table, postFilters is empty , because it can safely pushed down as preFilters.
+
+ /**
+ * For flat table, {@link #postFilterParseNodes} is empty , because it can safely pushed down as
+ * {@link #preFilterParseNodes}.
+ */
assert postFilterParseNodes == null || postFilterParseNodes.isEmpty();
- return this.getSelectStatementByApplyPreFiltersIfNotSubselect(newOrderByNodes);
+ return NODE_FACTORY.select(
+ tableNode,
+ originalJoinSelectStatement.getHint(),
+ false,
+ getSelectAliasedNodes(),
+ getCombinedPreFilterParseNodes(),
+ null,
+ null,
+ newOrderByNodes,
+ null,
+ null,
+ 0,
+ false,
+ originalJoinSelectStatement.hasSequence(),
+ Collections.<SelectStatement> emptyList(),
+ originalJoinSelectStatement.getUdfParseNodes());
}
public SelectStatement getAsSubqueryForOptimization(boolean applyGroupByOrOrderBy) throws SQLException {
@@ -1044,22 +1067,11 @@ public class JoinCompiler {
return false;
}
- SelectStatement selectStatementToUse =
- this.getSelectStatementByApplyPreFilters();
+ SelectStatement selectStatementToUse = this.getAsSubquery(null);
RewriteResult rewriteResult =
ParseNodeUtil.rewrite(selectStatementToUse, phoenixStatement.getConnection());
- return JoinCompiler.isCouldPushToServerAsHashJoinProbeSide(rewriteResult.getRewrittenSelectStatement());
- }
-
- /**
- * Get this {@link Table}'s new {@link SelectStatement} only applying
- * {@link #preFilterParseNodes}.
- * @return
- */
- private SelectStatement getSelectStatementByApplyPreFilters() {
- return this.isSubselect() ?
- this.getSelectStatementByApplyPreFiltersIfSubselect() :
- this.getSelectStatementByApplyPreFiltersIfNotSubselect(null);
+ return JoinCompiler.isCouldPushToServerAsHashJoinProbeSide(
+ rewriteResult.getRewrittenSelectStatement());
}
/**
@@ -1067,7 +1079,7 @@ public class JoinCompiler {
* {@link #preFilterParseNodes} for {@link #isSubselect()}.
* @return
*/
- private SelectStatement getSelectStatementByApplyPreFiltersIfSubselect() {
+ private SelectStatement getSelectStatementByApplyPreFiltersForSubselect() {
return SubselectRewriter.applyPreFiltersForSubselect(
subselectStatement,
preFilterParseNodes,
@@ -1075,30 +1087,6 @@ public class JoinCompiler {
}
- /**
- * Get this {@link Table}'s new {@link SelectStatement} only applying
- * {@link #preFilterParseNodes} if not {@link #isSubselect()}.
- * @return
- */
- private SelectStatement getSelectStatementByApplyPreFiltersIfNotSubselect(List<OrderByNode> newOrderByNodes) {
- return NODE_FACTORY.select(
- tableNode,
- originalJoinSelectStatement.getHint(),
- false,
- getSelectAliasedNodes(),
- getCombinedPreFilterParseNodes(),
- null,
- null,
- newOrderByNodes,
- null,
- null,
- 0,
- false,
- originalJoinSelectStatement.hasSequence(),
- Collections.<SelectStatement> emptyList(),
- originalJoinSelectStatement.getUdfParseNodes());
- }
-
protected boolean isWildCardSelect() {
return isWildcard;
}
@@ -1470,11 +1458,14 @@ public class JoinCompiler {
/**
* Check if this {@link Table} could be pushed to RegionServer
* {@link HashJoinRegionScanner} as the probe side of Hash join.
- * Note: the {@link SelectStatement} parameter should be rewritten by
+ * Note: the {@link SelectStatement} parameter must be rewritten by
* {@link ParseNodeUtil#rewrite} before this method.
* {@link SelectStatement} parameter could has NonCorrelated subquery,
* but for Correlated subquery, {@link ParseNodeUtil#rewrite} rewrite
* it as join.
+ * Note: {@link SelectStatement} could also have {@link OrderBy},but we
+ * could ignore the {@link OrderBy} because we do not guarantee the {@link OrderBy}
+ * after join.
* @param selectStatement
* @return
*/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 4a8f946..472b581 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -378,7 +378,14 @@ public class QueryCompiler {
joinExpressions[i] = joinConditions.getFirst();
List<Expression> hashExpressions = joinConditions.getSecond();
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
- boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
+ boolean optimized = getKeyExpressionCombinations(
+ keyRangeExpressions,
+ context,
+ joinTable.getOriginalJoinSelectStatement(),
+ tableRef,
+ joinSpec.getType(),
+ joinExpressions[i],
+ hashExpressions);
Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
joinTypes[i] = joinSpec.getType();
@@ -405,7 +412,7 @@ public class QueryCompiler {
}
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
- return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
+ return HashJoinPlan.create(joinTable.getOriginalJoinSelectStatement(), plan, joinInfo, hashPlans);
}
case HASH_BUILD_LEFT: {
JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
@@ -468,10 +475,29 @@ public class QueryCompiler {
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions},
new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true},
new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
- boolean usePersistentCache = joinTable.getStatement().getHint().hasHint(Hint.USE_PERSISTENT_CACHE);
+ boolean usePersistentCache = joinTable.getOriginalJoinSelectStatement().getHint().hasHint(Hint.USE_PERSISTENT_CACHE);
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
- getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
- return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, usePersistentCache, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
+ getKeyExpressionCombinations(
+ keyRangeExpressions,
+ context,
+ joinTable.getOriginalJoinSelectStatement(),
+ rhsTableRef,
+ type,
+ joinExpressions,
+ hashExpressions);
+ return HashJoinPlan.create(
+ joinTable.getOriginalJoinSelectStatement(),
+ rhsPlan,
+ joinInfo,
+ new HashSubPlan[]{
+ new HashSubPlan(
+ 0,
+ lhsPlan,
+ hashExpressions,
+ false,
+ usePersistentCache,
+ keyRangeExpressions.getFirst(),
+ keyRangeExpressions.getSecond())});
}
case SORT_MERGE: {
JoinTable lhsJoin = joinTable.createSubJoinTable(statement.getConnection());
@@ -511,13 +537,13 @@ public class QueryCompiler {
int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0;
PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
- ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes());
+ ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getOriginalJoinSelectStatement().getUdfParseNodes());
TableRef tableRef = resolver.getTables().get(0);
StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
subCtx.setCurrentTable(tableRef);
QueryPlan innerPlan = new SortMergeJoinPlan(
subCtx,
- joinTable.getStatement(),
+ joinTable.getOriginalJoinSelectStatement(),
tableRef,
type == JoinType.Right ? JoinType.Left : type,
lhsPlan,
@@ -534,12 +560,27 @@ public class QueryCompiler {
context.setResolver(resolver);
TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
ParseNode where = joinTable.getPostFiltersCombined();
- SelectStatement select = asSubquery
- ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
- Collections.<AliasedNode>emptyList(), where, null, null, orderBy, null, null, 0, false,
- joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(),
- joinTable.getStatement().getUdfParseNodes())
- : NODE_FACTORY.select(joinTable.getStatement(), from, where);
+ SelectStatement select = asSubquery ?
+ NODE_FACTORY.select(
+ from,
+ joinTable.getOriginalJoinSelectStatement().getHint(),
+ false,
+ Collections.<AliasedNode>emptyList(),
+ where,
+ null,
+ null,
+ orderBy,
+ null,
+ null,
+ 0,
+ false,
+ joinTable.getOriginalJoinSelectStatement().hasSequence(),
+ Collections.<SelectStatement>emptyList(),
+ joinTable.getOriginalJoinSelectStatement().getUdfParseNodes()) :
+ NODE_FACTORY.select(
+ joinTable.getOriginalJoinSelectStatement(),
+ from,
+ where);
return compileSingleFlatQuery(
context,
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index f530aed..ac32956 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -6612,6 +6612,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
" CONSTRAINT my_pk PRIMARY KEY (id))";
conn.createStatement().execute(sql);
+ //test for LHS is Correlated subquery,the RHS would be as the probe side of Hash join.
sql= "SELECT AAA.* FROM " +
"(SELECT id, test_id, lastchanged FROM test T " +
" WHERE lastchanged = ( SELECT max(lastchanged) FROM test WHERE test_id = T.test_id )) AAA " +
@@ -6631,6 +6632,8 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
assertTrue(
((TupleProjectionPlan)(subPlans[0].getInnerPlan())).getDelegate() instanceof HashJoinPlan);
+ //test for LHS is Correlated subquery,the RHS could not as the probe side of hash join,
+ //so use SortMergeJoinPlan
sql= "SELECT AAA.* FROM " +
"(SELECT id, test_id, lastchanged FROM test T " +
" WHERE lastchanged = ( SELECT max(lastchanged) FROM test WHERE test_id = T.test_id )) AAA " +
@@ -6641,6 +6644,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
assertTrue(queryPlan instanceof ClientScanPlan);
assertTrue(((ClientScanPlan)queryPlan).getDelegate() instanceof SortMergeJoinPlan);
+ //test for LHS is NonCorrelated subquery ,would use HashJoin.
String GRAMMAR_TABLE = "CREATE TABLE IF NOT EXISTS GRAMMAR_TABLE (ID INTEGER PRIMARY KEY, " +
"unsig_id UNSIGNED_INT, big_id BIGINT, unsig_long_id UNSIGNED_LONG, tiny_id TINYINT," +
"unsig_tiny_id UNSIGNED_TINYINT, small_id SMALLINT, unsig_small_id UNSIGNED_SMALLINT," +
@@ -6687,7 +6691,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
String tableName1 = generateUniqueName();
String tableName2 = generateUniqueName();
- sql="CREATE TABLE IF NOT EXISTS "+tableName1+" ( "+
+ sql="CREATE TABLE IF NOT EXISTS "+tableName1+" ( "+
"AID INTEGER PRIMARY KEY,"+
"AGE INTEGER"+
")";
@@ -6699,6 +6703,8 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
")";
conn.createStatement().execute(sql);
+ //test for LHS is a flat table and pushed down NonCorrelated subquery as preFiter.
+ //would use HashJoin.
sql="select a.aid from " + tableName1 + " a inner join "+
"(select bid,code from " + tableName2 + " where code > 10 limit 3) b on a.aid = b.bid "+
"where a.age > (select code from " + tableName2 + " c where c.bid = 2) order by a.aid";
@@ -6718,6 +6724,8 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
"SELECT CODE FROM " + tableName2 + " C WHERE C.BID = 2 LIMIT 2");
assertTrue(subPlans[1] instanceof HashSubPlan);
+ //test for LHS is a subselect and pushed down NonCorrelated subquery as preFiter.
+ //would use HashJoin.
sql="select a.aid from (select aid,age from " + tableName1 + " where age >=11 and age<=33) a inner join "+
"(select bid,code from " + tableName2 + " where code > 10 limit 3) b on a.aid = b.bid "+
"where a.age > (select code from " + tableName2 + " c where c.bid = 2) order by a.aid";
@@ -6737,6 +6745,8 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
"SELECT CODE FROM " + tableName2 + " C WHERE C.BID = 2 LIMIT 2");
assertTrue(subPlans[1] instanceof HashSubPlan);
+ //test for LHS is a subselect and pushed down aggregate NonCorrelated subquery as preFiter.
+ //would use HashJoin.
sql = "select a.aid from (select aid,age from " + tableName1 + " where age >=11 and age<=33) a inner join "+
"(select bid,code from " + tableName2 + " where code > 10 limit 3) b on a.aid = b.bid "+
"where a.age > (select max(code) from " + tableName2 + " c where c.bid >= 1) order by a.aid";
@@ -6756,6 +6766,11 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
"SELECT MAX(CODE) FROM " + tableName2 + " C WHERE C.BID >= 1 LIMIT 2");
assertTrue(subPlans[1] instanceof HashSubPlan);
+ /**
+ * test for LHS is a subselect and has an aggregate Correlated subquery as preFiter,
+ * but the aggregate Correlated subquery would be rewrite as HashJoin before
+ * {@link JoinCompiler#compile}.
+ */
sql = "select a.aid from (select aid,age from " + tableName1 + " where age >=11 and age<=33) a inner join "+
"(select bid,code from " + tableName2 + " where code > 10 limit 3) b on a.aid = b.bid "+
"where a.age > (select max(code) from " + tableName2 + " c where c.bid = a.aid) order by a.aid";