You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/04 18:39:10 UTC
[1/2] phoenix git commit: PHOENIX-4616 Move join query optimization
out from QueryCompiler into QueryOptimizer
Repository: phoenix
Updated Branches:
refs/heads/4.x-cdh5.11 0fa6d947e -> ce3e5867e
PHOENIX-4616 Move join query optimization out from QueryCompiler into QueryOptimizer
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/98a8bbd1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/98a8bbd1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/98a8bbd1
Branch: refs/heads/4.x-cdh5.11
Commit: 98a8bbd1c48badcd5a0e1ad729f9a2d98ebc1e3f
Parents: 0fa6d94
Author: maryannxue <ma...@gmail.com>
Authored: Wed Apr 4 01:16:10 2018 +0100
Committer: Pedro Boado <pb...@apache.org>
Committed: Wed Apr 4 19:25:35 2018 +0100
----------------------------------------------------------------------
.../apache/phoenix/end2end/join/BaseJoinIT.java | 2 +
.../apache/phoenix/compile/JoinCompiler.java | 298 ++++++-------------
.../apache/phoenix/compile/QueryCompiler.java | 79 +++--
.../phoenix/compile/SubselectRewriter.java | 5 +
.../apache/phoenix/compile/UpsertCompiler.java | 2 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 2 +-
.../GenSubqueryParamValuesRewriter.java | 153 ++++++++++
.../apache/phoenix/optimize/QueryOptimizer.java | 172 ++++++++++-
.../phoenix/compile/QueryCompilerTest.java | 114 ++++++-
9 files changed, 561 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java
index 6e03a37..4d4660c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java
@@ -34,6 +34,7 @@ import java.util.regex.Pattern;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
@@ -456,6 +457,7 @@ public abstract class BaseJoinIT extends ParallelStatsDisabledIT {
protected Connection getConnection() throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true");
+ props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, "true");
return DriverManager.getConnection(getUrl(), props);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index cf5a5dc..88e8f50 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -40,7 +40,7 @@ import org.apache.phoenix.expression.AndExpression;
import org.apache.phoenix.expression.CoerceExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.expression.function.MinAggregateFunction;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.AliasedNode;
@@ -53,26 +53,19 @@ import org.apache.phoenix.parse.ComparisonParseNode;
import org.apache.phoenix.parse.ConcreteTableNode;
import org.apache.phoenix.parse.DerivedTableNode;
import org.apache.phoenix.parse.EqualParseNode;
-import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
-import org.apache.phoenix.parse.IndexExpressionParseNodeRewriter;
import org.apache.phoenix.parse.JoinTableNode;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.OrderByNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.parse.ParseNodeRewriter;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.parse.TableNode;
import org.apache.phoenix.parse.TableNodeVisitor;
import org.apache.phoenix.parse.TableWildcardParseNode;
-import org.apache.phoenix.parse.UDFParseNode;
-import org.apache.phoenix.parse.WildcardParseNode;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.LocalIndexDataColumnRef;
@@ -126,9 +119,8 @@ public class JoinCompiler {
private final ColumnResolver origResolver;
private final boolean useStarJoin;
private final Map<ColumnRef, ColumnRefType> columnRefs;
+ private final Map<ColumnRef, ColumnParseNode> columnNodes;
private final boolean useSortMergeJoin;
- private final boolean costBased;
-
private JoinCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) {
this.statement = statement;
@@ -136,9 +128,8 @@ public class JoinCompiler {
this.origResolver = resolver;
this.useStarJoin = !select.getHint().hasHint(Hint.NO_STAR_JOIN);
this.columnRefs = new HashMap<ColumnRef, ColumnRefType>();
+ this.columnNodes = new HashMap<ColumnRef, ColumnParseNode>();
this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
- this.costBased = statement.getConnection().getQueryServices().getProps().getBoolean(
- QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
}
public static JoinTable compile(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
@@ -173,6 +164,9 @@ public class JoinCompiler {
}
}
+ compiler.columnNodes.putAll(joinLocalRefVisitor.getColumnRefMap());
+ compiler.columnNodes.putAll(generalRefVisitor.getColumnRefMap());
+
for (ColumnRef ref : generalRefVisitor.getColumnRefMap().keySet()) {
compiler.columnRefs.put(ref, ColumnRefType.GENERAL);
}
@@ -196,8 +190,8 @@ public class JoinCompiler {
@Override
public Pair<Table, List<JoinSpec>> visit(BindTableNode boundTableNode) throws SQLException {
TableRef tableRef = resolveTable(boundTableNode.getAlias(), boundTableNode.getName());
- List<AliasedNode> selectNodes = extractFromSelect(select.getSelect(), tableRef, origResolver);
- Table table = new Table(boundTableNode, Collections.<ColumnDef>emptyList(), boundTableNode.getTableSamplingRate(), selectNodes, tableRef);
+ boolean isWildCard = isWildCardSelectForTable(select.getSelect(), tableRef, origResolver);
+ Table table = new Table(boundTableNode, isWildCard, Collections.<ColumnDef>emptyList(), boundTableNode.getTableSamplingRate(), tableRef);
return new Pair<Table, List<JoinSpec>>(table, null);
}
@@ -219,8 +213,8 @@ public class JoinCompiler {
public Pair<Table, List<JoinSpec>> visit(NamedTableNode namedTableNode)
throws SQLException {
TableRef tableRef = resolveTable(namedTableNode.getAlias(), namedTableNode.getName());
- List<AliasedNode> selectNodes = extractFromSelect(select.getSelect(), tableRef, origResolver);
- Table table = new Table(namedTableNode, namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate(), selectNodes, tableRef);
+ boolean isWildCard = isWildCardSelectForTable(select.getSelect(), tableRef, origResolver);
+ Table table = new Table(namedTableNode, isWildCard, namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate(), tableRef);
return new Pair<Table, List<JoinSpec>>(table, null);
}
@@ -228,8 +222,8 @@ public class JoinCompiler {
public Pair<Table, List<JoinSpec>> visit(DerivedTableNode subselectNode)
throws SQLException {
TableRef tableRef = resolveTable(subselectNode.getAlias(), null);
- List<AliasedNode> selectNodes = extractFromSelect(select.getSelect(), tableRef, origResolver);
- Table table = new Table(subselectNode, selectNodes, tableRef);
+ boolean isWildCard = isWildCardSelectForTable(select.getSelect(), tableRef, origResolver);
+ Table table = new Table(subselectNode, isWildCard, tableRef);
return new Pair<Table, List<JoinSpec>>(table, null);
}
}
@@ -666,36 +660,35 @@ public class JoinCompiler {
public class Table {
private final TableNode tableNode;
+ private final boolean isWildcard;
private final List<ColumnDef> dynamicColumns;
private final Double tableSamplingRate;
private final SelectStatement subselect;
private final TableRef tableRef;
- private final List<AliasedNode> selectNodes; // all basic nodes related to this table, no aggregation.
private final List<ParseNode> preFilters;
private final List<ParseNode> postFilters;
private final boolean isPostFilterConvertible;
- private Table(TableNode tableNode, List<ColumnDef> dynamicColumns, Double tableSamplingRate,
- List<AliasedNode> selectNodes, TableRef tableRef) {
+ private Table(TableNode tableNode, boolean isWildcard, List<ColumnDef> dynamicColumns,
+ Double tableSamplingRate, TableRef tableRef) {
this.tableNode = tableNode;
+ this.isWildcard = isWildcard;
this.dynamicColumns = dynamicColumns;
this.tableSamplingRate=tableSamplingRate;
this.subselect = null;
this.tableRef = tableRef;
- this.selectNodes = selectNodes;
this.preFilters = new ArrayList<ParseNode>();
this.postFilters = Collections.<ParseNode>emptyList();
this.isPostFilterConvertible = false;
}
- private Table(DerivedTableNode tableNode,
- List<AliasedNode> selectNodes, TableRef tableRef) throws SQLException {
+ private Table(DerivedTableNode tableNode, boolean isWildcard, TableRef tableRef) throws SQLException {
this.tableNode = tableNode;
+ this.isWildcard = isWildcard;
this.dynamicColumns = Collections.<ColumnDef>emptyList();
this.tableSamplingRate=ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE;
this.subselect = SubselectRewriter.flatten(tableNode.getSelect(), statement.getConnection());
this.tableRef = tableRef;
- this.selectNodes = selectNodes;
this.preFilters = new ArrayList<ParseNode>();
this.postFilters = new ArrayList<ParseNode>();
this.isPostFilterConvertible = SubselectRewriter.isPostFilterConvertible(subselect);
@@ -717,8 +710,24 @@ public class JoinCompiler {
return subselect != null;
}
+ /**
+ * Returns all the basic select nodes, no aggregation.
+ */
public List<AliasedNode> getSelectNodes() {
- return selectNodes;
+ if (isWildCardSelect()) {
+ return Collections.singletonList(NODE_FACTORY.aliasedNode(null, NODE_FACTORY.wildcard()));
+ }
+
+ List<AliasedNode> ret = new ArrayList<AliasedNode>();
+ for (Map.Entry<ColumnRef, ColumnParseNode> entry : columnNodes.entrySet()) {
+ if (tableRef.equals(entry.getKey().getTableRef())) {
+ ret.add(NODE_FACTORY.aliasedNode(null, entry.getValue()));
+ }
+ }
+ if (ret.isEmpty()) {
+ ret.add(NODE_FACTORY.aliasedNode(null, NODE_FACTORY.literal(1)));
+ }
+ return ret;
}
public List<ParseNode> getPreFilters() {
@@ -757,11 +766,61 @@ public class JoinCompiler {
tableNode.getAlias(),
tableNode);
- return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null,
+ return NODE_FACTORY.select(tableNode, select.getHint(), false, getSelectNodes(), getPreFiltersCombined(), null,
null, orderBy, null, null, 0, false, select.hasSequence(),
Collections.<SelectStatement> emptyList(), select.getUdfParseNodes());
}
+ public SelectStatement getAsSubqueryForOptimization(boolean applyGroupByOrOrderBy) throws SQLException {
+ assert (!isSubselect());
+
+ SelectStatement query = getAsSubquery(null);
+ if (!applyGroupByOrOrderBy)
+ return query;
+
+ boolean addGroupBy = false;
+ boolean addOrderBy = false;
+ if (select.getGroupBy() != null && !select.getGroupBy().isEmpty()) {
+ ColumnRefParseNodeVisitor groupByVisitor = new ColumnRefParseNodeVisitor(origResolver, statement.getConnection());
+ for (ParseNode node : select.getGroupBy()) {
+ node.accept(groupByVisitor);
+ }
+ Set<TableRef> set = groupByVisitor.getTableRefSet();
+ if (set.size() == 1 && tableRef.equals(set.iterator().next())) {
+ addGroupBy = true;
+ }
+ } else if (select.getOrderBy() != null && !select.getOrderBy().isEmpty()) {
+ ColumnRefParseNodeVisitor orderByVisitor = new ColumnRefParseNodeVisitor(origResolver, statement.getConnection());
+ for (OrderByNode node : select.getOrderBy()) {
+ node.getNode().accept(orderByVisitor);
+ }
+ Set<TableRef> set = orderByVisitor.getTableRefSet();
+ if (set.size() == 1 && tableRef.equals(set.iterator().next())) {
+ addOrderBy = true;
+ }
+ }
+
+ if (!addGroupBy && !addOrderBy)
+ return query;
+
+ List<AliasedNode> selectList = query.getSelect();
+ if (addGroupBy) {
+ assert (!isWildCardSelect());
+ selectList = new ArrayList<AliasedNode>(query.getSelect().size());
+ for (AliasedNode aliasedNode : query.getSelect()) {
+ ParseNode node = NODE_FACTORY.function(
+ MinAggregateFunction.NAME, Collections.singletonList(aliasedNode.getNode()));
+ selectList.add(NODE_FACTORY.aliasedNode(null, node));
+ }
+ }
+
+ return NODE_FACTORY.select(query.getFrom(), query.getHint(), query.isDistinct(), selectList,
+ query.getWhere(), addGroupBy ? select.getGroupBy() : query.getGroupBy(),
+ addGroupBy ? null : query.getHaving(), addOrderBy ? select.getOrderBy() : query.getOrderBy(),
+ query.getLimit(), query.getOffset(), query.getBindCount(), addGroupBy, query.hasSequence(),
+ query.getSelects(), query.getUdfParseNodes());
+ }
+
public boolean hasFilters() {
return isSubselect() ? (!postFilters.isEmpty() || subselect.getWhere() != null || subselect.getHaving() != null) : !preFilters.isEmpty();
}
@@ -771,7 +830,7 @@ public class JoinCompiler {
}
protected boolean isWildCardSelect() {
- return (selectNodes.size() == 1 && selectNodes.get(0).getNode() instanceof TableWildcardParseNode);
+ return isWildcard;
}
public void projectColumns(Scan scan) {
@@ -1154,35 +1213,19 @@ public class JoinCompiler {
return NODE_FACTORY.and(nodes);
}
- private List<AliasedNode> extractFromSelect(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException {
- List<AliasedNode> ret = new ArrayList<AliasedNode>();
+ private boolean isWildCardSelectForTable(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException {
ColumnRefParseNodeVisitor visitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
for (AliasedNode aliasedNode : select) {
ParseNode node = aliasedNode.getNode();
if (node instanceof TableWildcardParseNode) {
TableName tableName = ((TableWildcardParseNode) node).getTableName();
if (tableRef.equals(resolver.resolveTable(tableName.getSchemaName(), tableName.getTableName()))) {
- ret.clear();
- ret.add(aliasedNode);
- return ret;
+ return true;
}
- continue;
- }
- node.accept(visitor);
- ColumnRefParseNodeVisitor.ColumnRefType type = visitor.getContentType(Collections.singletonList(tableRef));
- if (type == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
- ret.add(aliasedNode);
- } else if (type == ColumnRefParseNodeVisitor.ColumnRefType.COMPLEX) {
- for (Map.Entry<ColumnRef, ColumnParseNode> entry : visitor.getColumnRefMap().entrySet()) {
- if (entry.getKey().getTableRef().equals(tableRef)) {
- ret.add(NODE_FACTORY.aliasedNode(null, entry.getValue()));
- }
- }
}
- visitor.reset();
}
- return ret;
+ return false;
}
private static Expression compilePostFilterExpression(StatementContext context, List<ParseNode> postFilters) throws SQLException {
@@ -1203,167 +1246,6 @@ public class JoinCompiler {
return AndExpression.create(expressions);
}
- public static Pair<SelectStatement, Map<TableRef, QueryPlan>> optimize(
- PhoenixStatement statement, SelectStatement select, final ColumnResolver resolver) throws SQLException {
- TableRef groupByTableRef = null;
- TableRef orderByTableRef = null;
- if (select.getGroupBy() != null && !select.getGroupBy().isEmpty()) {
- ColumnRefParseNodeVisitor groupByVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
- for (ParseNode node : select.getGroupBy()) {
- node.accept(groupByVisitor);
- }
- Set<TableRef> set = groupByVisitor.getTableRefSet();
- if (set.size() == 1) {
- groupByTableRef = set.iterator().next();
- }
- } else if (select.getOrderBy() != null && !select.getOrderBy().isEmpty()) {
- ColumnRefParseNodeVisitor orderByVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
- for (OrderByNode node : select.getOrderBy()) {
- node.getNode().accept(orderByVisitor);
- }
- Set<TableRef> set = orderByVisitor.getTableRefSet();
- if (set.size() == 1) {
- orderByTableRef = set.iterator().next();
- }
- }
- JoinTable join = compile(statement, select, resolver);
- if (groupByTableRef != null || orderByTableRef != null) {
- QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false, null);
- List<Object> binds = statement.getParameters();
- StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
- QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null, Collections.<TableRef, QueryPlan>emptyMap());
- TableRef table = plan.getTableRef();
- if (groupByTableRef != null && !groupByTableRef.equals(table)) {
- groupByTableRef = null;
- }
- if (orderByTableRef != null && !orderByTableRef.equals(table)) {
- orderByTableRef = null;
- }
- }
-
- Map<TableRef, TableRef> replacementMap = null;
- Map<TableRef, QueryPlan> dataPlanMap = null;
-
- for (Table table : join.getTables()) {
- if (table.isSubselect())
- continue;
- TableRef tableRef = table.getTableRef();
- List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null;
- List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null;
- SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), table.getTableSamplingRate(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes());
- // TODO: It seems inefficient to be recompiling the statement again and again inside of this optimize call
- QueryPlan dataPlan =
- new QueryCompiler(
- statement, stmt,
- FromCompiler.getResolverForQuery(stmt, statement.getConnection()),
- false, null)
- .compile();
- QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, dataPlan);
- TableRef newTableRef = plan.getTableRef();
- if (!newTableRef.equals(tableRef)) {
- if (replacementMap == null) {
- replacementMap = new HashMap<TableRef, TableRef>();
- dataPlanMap = new HashMap<TableRef, QueryPlan>();
- }
- replacementMap.put(tableRef, newTableRef);
- dataPlanMap.put(newTableRef, dataPlan);
- }
- }
-
- if (replacementMap == null)
- return new Pair<SelectStatement, Map<TableRef, QueryPlan>>(
- select, Collections.<TableRef, QueryPlan> emptyMap());
-
- final Map<TableRef, TableRef> replacement = replacementMap;
- TableNode from = select.getFrom();
- TableNode newFrom = from.accept(new TableNodeVisitor<TableNode>() {
- private TableRef resolveTable(String alias, TableName name) throws SQLException {
- if (alias != null)
- return resolver.resolveTable(null, alias);
-
- return resolver.resolveTable(name.getSchemaName(), name.getTableName());
- }
-
- private TableName getReplacedTableName(TableRef tableRef) {
- String schemaName = tableRef.getTable().getSchemaName().getString();
- return TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString());
- }
-
- @Override
- public TableNode visit(BindTableNode boundTableNode) throws SQLException {
- TableRef tableRef = resolveTable(boundTableNode.getAlias(), boundTableNode.getName());
- TableRef replaceRef = replacement.get(tableRef);
- if (replaceRef == null)
- return boundTableNode;
-
- String alias = boundTableNode.getAlias();
- return NODE_FACTORY.bindTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef));
- }
-
- @Override
- public TableNode visit(JoinTableNode joinNode) throws SQLException {
- TableNode lhs = joinNode.getLHS();
- TableNode rhs = joinNode.getRHS();
- TableNode lhsReplace = lhs.accept(this);
- TableNode rhsReplace = rhs.accept(this);
- if (lhs == lhsReplace && rhs == rhsReplace)
- return joinNode;
-
- return NODE_FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode(), joinNode.isSingleValueOnly());
- }
-
- @Override
- public TableNode visit(NamedTableNode namedTableNode)
- throws SQLException {
- TableRef tableRef = resolveTable(namedTableNode.getAlias(), namedTableNode.getName());
- TableRef replaceRef = replacement.get(tableRef);
- if (replaceRef == null)
- return namedTableNode;
-
- String alias = namedTableNode.getAlias();
- return NODE_FACTORY.namedTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef), namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate());
- }
-
- @Override
- public TableNode visit(DerivedTableNode subselectNode)
- throws SQLException {
- return subselectNode;
- }
- });
-
- SelectStatement indexSelect = IndexStatementRewriter.translate(NODE_FACTORY.select(select, newFrom), resolver, replacement);
- for ( TableRef indexTableRef : replacement.values()) {
- // replace expressions with corresponding matching columns for functional indexes
- indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), statement.getConnection(), indexSelect.getUdfParseNodes()));
- }
- return new Pair<SelectStatement, Map<TableRef, QueryPlan>>(indexSelect, dataPlanMap);
- }
-
- private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, Double tableSamplingRate, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy,
- List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence, Map<String, UDFParseNode> udfParseNodes) {
- String schemaName = tableRef.getTable().getSchemaName().getString();
- TableName tName = TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString());
- List<AliasedNode> selectList = new ArrayList<AliasedNode>();
- if (isWildCardSelect) {
- selectList.add(NODE_FACTORY.aliasedNode(null, WildcardParseNode.INSTANCE));
- } else {
- for (ColumnRef colRef : columnRefs.keySet()) {
- if (colRef.getTableRef().equals(tableRef)) {
- ParseNode node = NODE_FACTORY.column(tName, '"' + colRef.getColumn().getName().getString() + '"', null);
- if (groupBy != null) {
- node = NODE_FACTORY.function(CountAggregateFunction.NAME, Collections.singletonList(node));
- }
- selectList.add(NODE_FACTORY.aliasedNode(null, node));
- }
- }
- }
- String tableAlias = tableRef.getTableAlias();
- TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols,tableSamplingRate);
-
- return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, null, 0,
- groupBy != null, hasSequence, Collections.<SelectStatement> emptyList(), udfParseNodes);
- }
-
public static PTable joinProjectedTables(PTable left, PTable right, JoinType type) throws SQLException {
Preconditions.checkArgument(left.getType() == PTableType.PROJECTED);
Preconditions.checkArgument(right.getType() == PTableType.PROJECTED);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 729e439..9568ad8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -108,14 +108,15 @@ public class QueryCompiler {
private final SequenceManager sequenceManager;
private final boolean projectTuples;
private final boolean noChildParentJoinOptimization;
- private final QueryPlan dataPlan;
+ private final boolean optimizeSubquery;
+ private final Map<TableRef, QueryPlan> dataPlans;
private final boolean costBased;
- public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
- this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan);
+ public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
+ this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, optimizeSubquery, dataPlans);
}
- public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
+ public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
this.statement = statement;
this.select = select;
this.resolver = resolver;
@@ -136,11 +137,12 @@ public class QueryCompiler {
scan.setCaching(statement.getFetchSize());
this.originalScan = ScanUtil.newScan(scan);
- this.dataPlan = dataPlan;
+ this.optimizeSubquery = optimizeSubquery;
+ this.dataPlans = dataPlans == null ? Collections.<TableRef, QueryPlan>emptyMap() : dataPlans;
}
public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException {
- this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true, null);
+ this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true, false, null);
}
/**
@@ -184,7 +186,7 @@ public class QueryCompiler {
select.hasWildcard() ? null : select.getSelect());
ColumnResolver resolver = FromCompiler.getResolver(tableRef);
StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
- QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false, null);
+ QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false);
plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(),
plan.getOffset(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans,
context.getBindManager().getParameterMetaData());
@@ -195,18 +197,10 @@ public class QueryCompiler {
List<Object> binds = statement.getParameters();
StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
if (select.isJoin()) {
- Pair<SelectStatement, Map<TableRef, QueryPlan>> optimized =
- JoinCompiler.optimize(statement, select, resolver);
- SelectStatement optimizedSelect = optimized.getFirst();
- if (select != optimizedSelect) {
- ColumnResolver resolver = FromCompiler.getResolverForQuery(optimizedSelect, statement.getConnection());
- context = new StatementContext(statement, resolver, scan, sequenceManager);
- }
- JoinTable joinTable = JoinCompiler.compile(statement, optimizedSelect, context.getResolver());
- return compileJoinQuery(
- context, binds, joinTable, false, false, null, optimized.getSecond());
+ JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
+ return compileJoinQuery(context, binds, joinTable, false, false, null);
} else {
- return compileSingleQuery(context, select, binds, false, true, dataPlan);
+ return compileSingleQuery(context, select, binds, false, true);
}
}
@@ -219,7 +213,7 @@ public class QueryCompiler {
* 2) Otherwise, return the join plan compiled with the default strategy.
* @see JoinCompiler.JoinTable#getApplicableJoinStrategies()
*/
- protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
+ protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
if (joinTable.getJoinSpecs().isEmpty()) {
Table table = joinTable.getTable();
SelectStatement subquery = table.getAsSubquery(orderBy);
@@ -230,8 +224,7 @@ public class QueryCompiler {
TupleProjector.serializeProjectorIntoScan(context.getScan(), projector);
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
table.projectColumns(context.getScan());
- QueryPlan dataPlan = dataPlans.get(table.getTableRef());
- return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true, dataPlan);
+ return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true);
}
QueryPlan plan = compileSubquery(subquery, false);
PTable projectedTable = table.createProjectedTable(plan.getProjector());
@@ -243,7 +236,7 @@ public class QueryCompiler {
assert strategies.size() > 0;
if (!costBased || strategies.size() == 1) {
return compileJoinQuery(
- strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy, dataPlans);
+ strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy);
}
QueryPlan bestPlan = null;
@@ -252,7 +245,7 @@ public class QueryCompiler {
StatementContext newContext = new StatementContext(
context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager());
QueryPlan plan = compileJoinQuery(
- strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy, dataPlans);
+ strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy);
Cost cost = plan.getCost();
if (bestPlan == null || cost.compareTo(bestCost) < 0) {
bestPlan = plan;
@@ -264,7 +257,7 @@ public class QueryCompiler {
return bestPlan;
}
- protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
+ protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
byte[] emptyByteArray = new byte[0];
List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
switch (strategy) {
@@ -307,7 +300,7 @@ public class QueryCompiler {
JoinSpec joinSpec = joinSpecs.get(i);
Scan subScan = ScanUtil.newScan(originalScan);
subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null, dataPlans);
+ subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
if (hasPostReference) {
tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
@@ -334,8 +327,7 @@ public class QueryCompiler {
hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
}
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
- QueryPlan dataPlan = dataPlans.get(tableRef);
- QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true, dataPlan);
+ QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
Integer limit = null;
Integer offset = null;
@@ -355,7 +347,7 @@ public class QueryCompiler {
JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
Scan subScan = ScanUtil.newScan(originalScan);
StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null, dataPlans);
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
PTable rhsProjTable;
TableRef rhsTableRef;
SelectStatement rhs;
@@ -388,8 +380,7 @@ public class QueryCompiler {
PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
- QueryPlan dataPlan = dataPlans.get(rhsTableRef);
- QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true, dataPlan);
+ QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
Integer limit = null;
Integer offset = null;
@@ -426,13 +417,13 @@ public class QueryCompiler {
Scan lhsScan = ScanUtil.newScan(originalScan);
StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
- QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy, dataPlans);
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
Scan rhsScan = ScanUtil.newScan(originalScan);
StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
- QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy, dataPlans);
+ QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy);
@@ -459,7 +450,7 @@ public class QueryCompiler {
joinTable.getStatement().getUdfParseNodes())
: NODE_FACTORY.select(joinTable.getStatement(), from, where);
- return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder, null);
+ return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
}
default:
throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
@@ -512,16 +503,18 @@ public class QueryCompiler {
}
int maxRows = this.statement.getMaxRows();
this.statement.setMaxRows(pushDownMaxRows ? maxRows : 0); // overwrite maxRows to avoid its impact on inner queries.
- QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, null).compile();
- plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+ QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, optimizeSubquery, null).compile();
+ if (optimizeSubquery) {
+ plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+ }
this.statement.setMaxRows(maxRows); // restore maxRows.
return plan;
}
- protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan dataPlan) throws SQLException{
+ protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
SelectStatement innerSelect = select.getInnerSelectStatement();
if (innerSelect == null) {
- return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true, dataPlan);
+ return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true);
}
QueryPlan innerPlan = compileSubquery(innerSelect, false);
@@ -536,10 +529,10 @@ public class QueryCompiler {
context.setCurrentTable(tableRef);
boolean isInRowKeyOrder = innerPlan.getGroupBy() == GroupBy.EMPTY_GROUP_BY && innerPlan.getOrderBy() == OrderBy.EMPTY_ORDER_BY;
- return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder, null);
+ return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder);
}
- protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder, QueryPlan dataPlan) throws SQLException{
+ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder) throws SQLException{
PTable projectedTable = null;
if (this.projectTuples) {
projectedTable = TupleProjectionCompiler.createProjectedTable(select, context);
@@ -596,6 +589,7 @@ public class QueryCompiler {
}
QueryPlan plan = innerPlan;
+ QueryPlan dataPlan = dataPlans.get(tableRef);
if (plan == null) {
ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory;
plan = select.getFrom() == null
@@ -607,6 +601,7 @@ public class QueryCompiler {
: new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy,
parallelIteratorFactory, allowPageFilter, dataPlan));
}
+ SelectStatement planSelect = asSubquery ? select : this.select;
if (!subqueries.isEmpty()) {
int count = subqueries.size();
WhereClauseSubPlan[] subPlans = new WhereClauseSubPlan[count];
@@ -615,7 +610,7 @@ public class QueryCompiler {
SelectStatement stmt = subqueryNode.getSelectNode();
subPlans[i++] = new WhereClauseSubPlan(compileSubquery(stmt, false), stmt, subqueryNode.expectSingleRow());
}
- plan = HashJoinPlan.create(select, plan, null, subPlans);
+ plan = HashJoinPlan.create(planSelect, plan, null, subPlans);
}
if (innerPlan != null) {
@@ -623,9 +618,9 @@ public class QueryCompiler {
where = null; // we do not pass "true" as filter
}
plan = select.isAggregate() || select.isDistinct()
- ? new ClientAggregatePlan(context, select, tableRef, projector, limit, offset, where, orderBy,
+ ? new ClientAggregatePlan(context, planSelect, tableRef, projector, limit, offset, where, orderBy,
groupBy, having, plan)
- : new ClientScanPlan(context, select, tableRef, projector, limit, offset, where, orderBy, plan);
+ : new ClientScanPlan(context, planSelect, tableRef, projector, limit, offset, where, orderBy, plan);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index a926e06..af19ed1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -47,6 +47,11 @@ import org.apache.phoenix.util.SchemaUtil;
import com.google.common.collect.Lists;
+/*
+ * Class for flattening derived-tables when possible. A derived-table can be
+ * flattened if the merged statement preserves the same semantics as the original
+ * statement.
+ */
public class SubselectRewriter extends ParseNodeRewriter {
private final String tableAlias;
private final Map<String, ParseNode> aliasMap;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 92e3b8a..1a9a686 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -549,7 +549,7 @@ public class UpsertCompiler {
select = SelectStatement.create(select, hint);
// Pass scan through if same table in upsert and select so that projection is computed correctly
// Use optimizer to choose the best plan
- QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false, null);
+ QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false, false, null);
queryPlanToBe = compiler.compile();
// This is post-fix: if the tableRef is a projected table, this means there are post-processing
// steps and parallelIteratorFactory did not take effect.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 1058400..4a692d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -477,7 +477,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
select = StatementNormalizer.normalize(transformedSelect, resolver);
}
- QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true, null).compile();
+ QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true, false, null).compile();
plan.getContext().getSequenceManager().validateSequences(seqAction);
return plan;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/optimize/GenSubqueryParamValuesRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/GenSubqueryParamValuesRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/GenSubqueryParamValuesRewriter.java
new file mode 100644
index 0000000..567e92e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/GenSubqueryParamValuesRewriter.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.optimize;
+
+import org.apache.phoenix.compile.ExpressionCompiler;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.ArrayAllComparisonNode;
+import org.apache.phoenix.parse.ArrayAnyComparisonNode;
+import org.apache.phoenix.parse.ComparisonParseNode;
+import org.apache.phoenix.parse.ExistsParseNode;
+import org.apache.phoenix.parse.InParseNode;
+import org.apache.phoenix.parse.OrParseNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.ParseNodeRewriter;
+import org.apache.phoenix.parse.SubqueryParseNode;
+import org.apache.phoenix.schema.types.PDataType;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Creates a new WHERE clause by replaces non-correlated sub-queries with dummy values.
+ *
+ * Note that this class does not check the presence of correlation, thus it should only
+ * be used after de-correlation has been performed.
+ */
+public class GenSubqueryParamValuesRewriter extends ParseNodeRewriter {
+ private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
+
+ private final ExpressionCompiler expressionCompiler;
+
+ public static ParseNode replaceWithDummyValues(
+ ParseNode where, StatementContext context) throws SQLException {
+ return rewrite(where, new GenSubqueryParamValuesRewriter(context));
+ }
+
+ private GenSubqueryParamValuesRewriter(StatementContext context) {
+ this.expressionCompiler = new ExpressionCompiler(context);
+ }
+
+ protected List<ParseNode> generateDummyValues(
+ ParseNode lhs, boolean multipleValues) throws SQLException {
+ Expression expr = lhs.accept(expressionCompiler);
+ PDataType type = expr.getDataType();
+ if (!multipleValues) {
+ return Arrays.<ParseNode> asList(NODE_FACTORY.literal(type.getSampleValue(), type));
+ }
+
+ return Arrays.<ParseNode> asList(
+ NODE_FACTORY.literal(type.getSampleValue(), type),
+ NODE_FACTORY.literal(type.getSampleValue(), type),
+ NODE_FACTORY.literal(type.getSampleValue(), type));
+ }
+
+ @Override
+ public ParseNode visitLeave(AndParseNode node, List<ParseNode> l) throws SQLException {
+ return leaveCompoundNode(node, l, new CompoundNodeFactory() {
+ @Override
+ public ParseNode createNode(List<ParseNode> children) {
+ if (children.isEmpty()) {
+ return null;
+ }
+ if (children.size() == 1) {
+ return children.get(0);
+ }
+ return NODE_FACTORY.and(children);
+ }
+ });
+ }
+
+ @Override
+ public ParseNode visitLeave(OrParseNode node, List<ParseNode> l) throws SQLException {
+ return leaveCompoundNode(node, l, new CompoundNodeFactory() {
+ @Override
+ public ParseNode createNode(List<ParseNode> children) {
+ if (children.isEmpty()) {
+ return null;
+ }
+ if (children.size() == 1) {
+ return children.get(0);
+ }
+ return NODE_FACTORY.or(children);
+ }
+ });
+ }
+
+ @Override
+ public ParseNode visitLeave(InParseNode node, List<ParseNode> l) throws SQLException {
+ ParseNode lhs = l.get(0);
+ List<ParseNode> inList = generateDummyValues(lhs, true);
+ List<ParseNode> children = new ArrayList<ParseNode>();
+ children.add(lhs);
+ children.addAll(inList);
+ return NODE_FACTORY.inList(children, node.isNegate());
+ }
+
+ @Override
+ public ParseNode visitLeave(ExistsParseNode node, List<ParseNode> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public ParseNode visitLeave(ComparisonParseNode node, List<ParseNode> l) throws SQLException {
+ if (!(l.get(1) instanceof SubqueryParseNode)) {
+ super.visitLeave(node, l);
+ }
+
+ ParseNode lhs = l.get(0);
+ List<ParseNode> rhs = generateDummyValues(lhs, false);
+ List<ParseNode> children = new ArrayList<ParseNode>();
+ children.add(lhs);
+ children.add(rhs.get(0));
+ return super.visitLeave(node, children);
+ }
+
+ @Override
+ public ParseNode visitLeave(ArrayAnyComparisonNode node, List<ParseNode> l) throws SQLException {
+ ComparisonParseNode compare = (ComparisonParseNode) l.get(1);
+ ParseNode lhs = compare.getLHS();
+ List<ParseNode> rhs = generateDummyValues(lhs, false);
+
+ return NODE_FACTORY.comparison(compare.getFilterOp(), lhs, rhs.get(0));
+ }
+
+ @Override
+ public ParseNode visitLeave(ArrayAllComparisonNode node, List<ParseNode> l) throws SQLException {
+ ComparisonParseNode compare = (ComparisonParseNode) l.get(1);
+ ParseNode lhs = compare.getLHS();
+ List<ParseNode> rhs = generateDummyValues(lhs, false);
+
+ return NODE_FACTORY.comparison(compare.getFilterOp(), lhs, rhs.get(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 8481bc5..31f5c34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -21,33 +21,45 @@ package org.apache.phoenix.optimize;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.ExpressionCompiler;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.IndexStatementRewriter;
+import org.apache.phoenix.compile.JoinCompiler;
import org.apache.phoenix.compile.QueryCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementNormalizer;
import org.apache.phoenix.compile.SubqueryRewriter;
+import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.BindTableNode;
import org.apache.phoenix.parse.BooleanParseNodeVisitor;
import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.DerivedTableNode;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.IndexExpressionParseNodeRewriter;
+import org.apache.phoenix.parse.JoinTableNode;
+import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.ParseNodeRewriter;
import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.parse.TableNodeVisitor;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -59,7 +71,6 @@ import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import com.google.common.collect.Lists;
@@ -70,7 +81,7 @@ public class QueryOptimizer {
private final QueryServices services;
private final boolean useIndexes;
private final boolean costBased;
- private long indexPendingDisabledThreshold;
+ private final long indexPendingDisabledThreshold;
public QueryOptimizer(QueryServices services) {
this.services = services;
@@ -111,18 +122,81 @@ public class QueryOptimizer {
}
private List<QueryPlan> getApplicablePlans(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, boolean stopAtBestPlan) throws SQLException {
+ if (!useIndexes) {
+ return Collections.singletonList(dataPlan);
+ }
+
+ if (dataPlan instanceof BaseQueryPlan) {
+ return getApplicablePlans((BaseQueryPlan) dataPlan, statement, targetColumns, parallelIteratorFactory, stopAtBestPlan);
+ }
+
+ SelectStatement select = (SelectStatement) dataPlan.getStatement();
+ ColumnResolver resolver = FromCompiler.getResolverForQuery(select, statement.getConnection());
+ Map<TableRef, QueryPlan> dataPlans = null;
+
+ // Find the optimal index plan for each join tables in a join query or a
+ // non-correlated sub-query, then rewrite the query with found index tables.
+ if (select.isJoin()
+ || (select.getWhere() != null && select.getWhere().hasSubquery())) {
+ JoinCompiler.JoinTable join = JoinCompiler.compile(statement, select, resolver);
+ Map<TableRef, TableRef> replacement = null;
+ for (JoinCompiler.Table table : join.getTables()) {
+ if (table.isSubselect())
+ continue;
+ TableRef tableRef = table.getTableRef();
+ SelectStatement stmt = table.getAsSubqueryForOptimization(tableRef.equals(dataPlan.getTableRef()));
+ // Replace non-correlated sub-queries in WHERE clause with dummy values
+ // so the filter conditions can be taken into account in optimization.
+ if (stmt.getWhere() != null && stmt.getWhere().hasSubquery()) {
+ StatementContext context =
+ new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));;
+ ParseNode dummyWhere = GenSubqueryParamValuesRewriter.replaceWithDummyValues(stmt.getWhere(), context);
+ stmt = FACTORY.select(stmt, dummyWhere);
+ }
+ // TODO: It seems inefficient to be recompiling the statement again inside of this optimize call
+ QueryPlan subDataPlan =
+ new QueryCompiler(
+ statement, stmt,
+ FromCompiler.getResolverForQuery(stmt, statement.getConnection()),
+ false, false, null)
+ .compile();
+ QueryPlan subPlan = optimize(statement, subDataPlan);
+ TableRef newTableRef = subPlan.getTableRef();
+ if (!newTableRef.equals(tableRef)) {
+ if (replacement == null) {
+ replacement = new HashMap<TableRef, TableRef>();
+ dataPlans = new HashMap<TableRef, QueryPlan>();
+ }
+ replacement.put(tableRef, newTableRef);
+ dataPlans.put(newTableRef, subDataPlan);
+ }
+ }
+
+ if (replacement != null) {
+ select = rewriteQueryWithIndexReplacement(
+ statement.getConnection(), resolver, select, replacement);
+ resolver = FromCompiler.getResolverForQuery(select, statement.getConnection());
+ }
+ }
+
+ // Re-compile the plan with option "optimizeSubquery" turned on, so that enclosed
+ // sub-queries can be optimized recursively.
+ QueryCompiler compiler = new QueryCompiler(statement, select, resolver,
+ targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(),
+ true, true, dataPlans);
+ return Collections.singletonList(compiler.compile());
+ }
+
+ private List<QueryPlan> getApplicablePlans(BaseQueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, boolean stopAtBestPlan) throws SQLException {
SelectStatement select = (SelectStatement)dataPlan.getStatement();
// Exit early if we have a point lookup as we can't get better than that
- if (!useIndexes
- || (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan)) {
- return Collections.singletonList(dataPlan);
+ if (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan) {
+ return Collections.<QueryPlan> singletonList(dataPlan);
}
- // For single query tuple projection, indexes are inherited from the original table to the projected
- // table; otherwise not. So we pass projected table here, which is enough to tell if this is from a
- // single query or a part of join query.
- List<PTable>indexes = Lists.newArrayList(dataPlan.getContext().getResolver().getTables().get(0).getTable().getIndexes());
+
+ List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes());
if (indexes.isEmpty() || dataPlan.isDegenerate() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX)) {
- return Collections.singletonList(dataPlan);
+ return Collections.<QueryPlan> singletonList(dataPlan);
}
// The targetColumns is set for UPSERT SELECT to ensure that the proper type conversion takes place.
@@ -237,12 +311,13 @@ public class QueryOptimizer {
TableRef indexTableRef = resolver.getTables().get(0);
PTable indexTable = indexTableRef.getTable();
PIndexState indexState = indexTable.getIndexState();
+ Map<TableRef, QueryPlan> dataPlans = Collections.singletonMap(indexTableRef, dataPlan);
if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE
|| (indexState == PIndexState.PENDING_DISABLE && isUnderPendingDisableThreshold(indexTableRef.getCurrentTime(), indexTable.getIndexDisableTimestamp()))) {
try {
// translate nodes that match expressions that are indexed to the associated column parse node
indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes()));
- QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan);
+ QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, true, dataPlans);
QueryPlan plan = compiler.compile();
// If query doesn't have where clause and some of columns to project are missing
@@ -314,7 +389,7 @@ public class QueryOptimizer {
query = SubqueryRewriter.transform(query, queryResolver, statement.getConnection());
queryResolver = FromCompiler.getResolverForQuery(query, statement.getConnection());
query = StatementNormalizer.normalize(query, queryResolver);
- QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan).compile();
+ QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, true, dataPlans).compile();
return plan;
}
}
@@ -552,5 +627,76 @@ public class QueryOptimizer {
return node;
}
}
-
+
+ private static SelectStatement rewriteQueryWithIndexReplacement(
+ final PhoenixConnection connection, final ColumnResolver resolver,
+ final SelectStatement select, final Map<TableRef, TableRef> replacement) throws SQLException {
+ TableNode from = select.getFrom();
+ TableNode newFrom = from.accept(new TableNodeVisitor<TableNode>() {
+ private TableRef resolveTable(String alias, TableName name) throws SQLException {
+ if (alias != null)
+ return resolver.resolveTable(null, alias);
+
+ return resolver.resolveTable(name.getSchemaName(), name.getTableName());
+ }
+
+ private TableName getReplacedTableName(TableRef tableRef) {
+ String schemaName = tableRef.getTable().getSchemaName().getString();
+ return TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString());
+ }
+
+ @Override
+ public TableNode visit(BindTableNode boundTableNode) throws SQLException {
+ TableRef tableRef = resolveTable(boundTableNode.getAlias(), boundTableNode.getName());
+ TableRef replaceRef = replacement.get(tableRef);
+ if (replaceRef == null)
+ return boundTableNode;
+
+ String alias = boundTableNode.getAlias();
+ return FACTORY.bindTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef));
+ }
+
+ @Override
+ public TableNode visit(JoinTableNode joinNode) throws SQLException {
+ TableNode lhs = joinNode.getLHS();
+ TableNode rhs = joinNode.getRHS();
+ TableNode lhsReplace = lhs.accept(this);
+ TableNode rhsReplace = rhs.accept(this);
+ if (lhs == lhsReplace && rhs == rhsReplace)
+ return joinNode;
+
+ return FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode(), joinNode.isSingleValueOnly());
+ }
+
+ @Override
+ public TableNode visit(NamedTableNode namedTableNode)
+ throws SQLException {
+ TableRef tableRef = resolveTable(namedTableNode.getAlias(), namedTableNode.getName());
+ TableRef replaceRef = replacement.get(tableRef);
+ if (replaceRef == null)
+ return namedTableNode;
+
+ String alias = namedTableNode.getAlias();
+ return FACTORY.namedTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef), namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate());
+ }
+
+ @Override
+ public TableNode visit(DerivedTableNode subselectNode)
+ throws SQLException {
+ return subselectNode;
+ }
+ });
+
+ if (from == newFrom) {
+ return select;
+ }
+
+ SelectStatement indexSelect = IndexStatementRewriter.translate(FACTORY.select(select, newFrom), resolver, replacement);
+ for (TableRef indexTableRef : replacement.values()) {
+ // replace expressions with corresponding matching columns for functional indexes
+ indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), connection, indexSelect.getUdfParseNodes()));
+ }
+
+ return indexSelect;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 11f5f22..bac4ee8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
@@ -83,7 +84,9 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDecimal;
import org.apache.phoenix.schema.types.PInteger;
@@ -4309,7 +4312,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
assertEquals(e.getErrorCode(), SQLExceptionCode.CONNECTION_CLOSED.getErrorCode());
}
}
-
+
@Test
public void testSingleColLocalIndexPruning() throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -4656,6 +4659,115 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
}
}
+ @Test
+ public void testQueryPlanSourceRefsInHashJoin() throws SQLException {
+ String query = "SELECT * FROM (\n" +
+ " SELECT K1, V1 FROM A WHERE V1 = 'A'\n" +
+ ") T1 JOIN (\n" +
+ " SELECT K2, V2 FROM B WHERE V2 = 'B'\n" +
+ ") T2 ON K1 = K2 ORDER BY V1";
+ verifyQueryPlanSourceRefs(query, 2);
+ }
+
+ @Test
+ public void testQueryPlanSourceRefsInSortMergeJoin() throws SQLException {
+ String query = "SELECT * FROM (\n" +
+ " SELECT max(K1) KEY1, V1 FROM A GROUP BY V1\n" +
+ ") T1 JOIN (\n" +
+ " SELECT max(K2) KEY2, V2 FROM B GROUP BY V2\n" +
+ ") T2 ON KEY1 = KEY2 ORDER BY V1";
+ verifyQueryPlanSourceRefs(query, 2);
+ }
+
+ @Test
+ public void testQueryPlanSourceRefsInSubquery() throws SQLException {
+ String query = "SELECT * FROM A\n" +
+ "WHERE K1 > (\n" +
+ " SELECT max(K2) FROM B WHERE V2 = V1\n" +
+ ") ORDER BY V1";
+ verifyQueryPlanSourceRefs(query, 2);
+ }
+
+ @Test
+ public void testQueryPlanSourceRefsInSubquery2() throws SQLException {
+ String query = "SELECT * FROM A\n" +
+ "WHERE V1 > ANY (\n" +
+ " SELECT K2 FROM B WHERE V2 = 'B'\n" +
+ ")";
+ verifyQueryPlanSourceRefs(query, 2);
+ }
+
+ @Test
+ public void testQueryPlanSourceRefsInSubquery3() throws SQLException {
+ String query = "SELECT * FROM A\n" +
+ "WHERE V1 > ANY (\n" +
+ " SELECT K2 FROM B B1" +
+ " WHERE V2 = (\n" +
+ " SELECT max(V2) FROM B B2\n" +
+ " WHERE B2.K2 = B1.K2 AND V2 < 'K'\n" +
+ " )\n" +
+ ")";
+ verifyQueryPlanSourceRefs(query, 3);
+ }
+
+ @Test
+ public void testQueryPlanSourceRefsInSubquery4() throws SQLException {
+ String query = "SELECT * FROM (\n" +
+ " SELECT K1, K2 FROM A\n" +
+ " JOIN B ON K1 = K2\n" +
+ " WHERE V1 = 'A' AND V2 = 'B'\n" +
+ " LIMIT 10\n" +
+ ") ORDER BY K1";
+ verifyQueryPlanSourceRefs(query, 2);
+ }
+
+ @Test
+ public void testQueryPlanSourceRefsInSubquery5() throws SQLException {
+ String query = "SELECT * FROM (\n" +
+ " SELECT KEY1, KEY2 FROM (\n" +
+ " SELECT max(K1) KEY1, V1 FROM A GROUP BY V1\n" +
+ " ) T1 JOIN (\n" +
+ " SELECT max(K2) KEY2, V2 FROM B GROUP BY V2\n" +
+ " ) T2 ON KEY1 = KEY2 LIMIT 10\n" +
+ ") ORDER BY KEY1";
+ verifyQueryPlanSourceRefs(query, 2);
+ }
+
+ @Test
+ public void testQueryPlanSourceRefsInUnion() throws SQLException {
+ String query = "SELECT K1, V1 FROM A WHERE V1 = 'A'\n" +
+ "UNION ALL\n" +
+ "SELECT K2, V2 FROM B WHERE V2 = 'B'";
+ verifyQueryPlanSourceRefs(query, 2);
+ }
+
+ private void verifyQueryPlanSourceRefs(String query, int refCount) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE A (\n" +
+ " K1 VARCHAR(10) NOT NULL PRIMARY KEY,\n" +
+ " V1 VARCHAR(10))");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX1 ON A(V1)");
+ conn.createStatement().execute("CREATE TABLE B (\n" +
+ " K2 VARCHAR(10) NOT NULL PRIMARY KEY,\n" +
+ " V2 VARCHAR(10))");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX2 ON B(V2)");
+ PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = stmt.compileQuery(query);
+ Set<TableRef> sourceRefs = plan.getSourceRefs();
+ assertEquals(refCount, sourceRefs.size());
+ for (TableRef table : sourceRefs) {
+ assertTrue(table.getTable().getType() == PTableType.TABLE);
+ }
+ plan = stmt.optimizeQuery(query);
+ sourceRefs = plan.getSourceRefs();
+ assertEquals(refCount, sourceRefs.size());
+ for (TableRef table : sourceRefs) {
+ assertTrue(table.getTable().getType() == PTableType.INDEX);
+ }
+ }
+ }
+
private static class MultipleChildrenExtractor implements QueryPlanVisitor<List<QueryPlan>> {
@Override
[2/2] phoenix git commit: PHOENIX-4682
UngroupedAggregateRegionObserver preCompactScannerOpen hook should not throw
exceptions
Posted by pb...@apache.org.
PHOENIX-4682 UngroupedAggregateRegionObserver preCompactScannerOpen hook should not throw exceptions
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ce3e5867
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ce3e5867
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ce3e5867
Branch: refs/heads/4.x-cdh5.11
Commit: ce3e5867eef9bf10038cc3729afdcfee1c27aa14
Parents: 98a8bbd
Author: Vincent Poon <vi...@apache.org>
Authored: Wed Apr 4 19:06:24 2018 +0100
Committer: Pedro Boado <pb...@apache.org>
Committed: Wed Apr 4 19:37:35 2018 +0100
----------------------------------------------------------------------
.../phoenix/end2end/index/MutableIndexIT.java | 49 ++++++++++--
.../UngroupedAggregateRegionObserver.java | 81 ++++++++++++--------
2 files changed, 88 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce3e5867/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index efae15e..4b88b92 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -41,29 +41,26 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -867,6 +864,42 @@ public class MutableIndexIT extends ParallelStatsDisabledIT {
}
}
+ // some tables (e.g. indexes on views) have UngroupedAgg coproc loaded, but don't have a
+ // corresponding row in syscat. This tests that compaction isn't blocked
+ @Test(timeout=120000)
+ public void testCompactNonPhoenixTable() throws Exception {
+ try (Connection conn = getConnection()) {
+ // create a vanilla HBase table (non-Phoenix)
+ String randomTable = generateUniqueName();
+ TableName hbaseTN = TableName.valueOf(randomTable);
+ byte[] famBytes = Bytes.toBytes("fam");
+ HTable hTable = getUtility().createTable(hbaseTN, famBytes);
+ TestUtil.addCoprocessor(conn, randomTable, UngroupedAggregateRegionObserver.class);
+ Put put = new Put(Bytes.toBytes("row"));
+ byte[] value = new byte[1];
+ Bytes.random(value);
+ put.add(famBytes, Bytes.toBytes("colQ"), value);
+ hTable.put(put);
+ hTable.flushCommits();
+
+ // major compaction shouldn't cause a timeout or RS abort
+ List<HRegion> regions = getUtility().getHBaseCluster().getRegions(hbaseTN);
+ HRegion hRegion = regions.get(0);
+ hRegion.flush(true);
+ HStore store = (HStore) hRegion.getStore(famBytes);
+ store.triggerMajorCompaction();
+ store.compactRecentForTestingAssumingDefaultPolicy(1);
+
+ // we should be able to compact syscat itself as well
+ regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+ hRegion = regions.get(0);
+ hRegion.flush(true);
+ store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+ store.triggerMajorCompaction();
+ store.compactRecentForTestingAssumingDefaultPolicy(1);
+ }
+ }
+
private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException {
PreparedStatement stmt = tenantConn.prepareStatement(dml);
stmt.setString(1, "00000000000000" + String.valueOf(i));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce3e5867/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 72ca58d..965ba1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -99,6 +99,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
@@ -112,6 +113,7 @@ import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
@@ -962,35 +964,36 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
@Override
- public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final InternalScanner scanner, final ScanType scanType) throws IOException {
- // Compaction and split upcalls run with the effective user context of the requesting user.
- // This will lead to failure of cross cluster RPC if the effective user is not
- // the login user. Switch to the login user context to ensure we have the expected
- // security context.
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
- InternalScanner internalScanner = scanner;
- if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+ public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final InternalScanner scanner, final ScanType scanType)
+ throws IOException {
+ if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+ final TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
+ // Compaction and split upcalls run with the effective user context of the requesting user.
+ // This will lead to failure of cross cluster RPC if the effective user is not
+ // the login user. Switch to the login user context to ensure we have the expected
+ // security context.
+ return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
+ @Override public InternalScanner run() throws Exception {
+ InternalScanner internalScanner = scanner;
try {
long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
c.getEnvironment(), table.getNameAsString(), clientTimeStamp,
store.getFamily().getName());
internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner);
- } catch (IOException e) {
+ } catch (Exception e) {
// If we can't reach the stats table, don't interrupt the normal
- // compaction operation, just log a warning.
- if (logger.isWarnEnabled()) {
- logger.warn("Unable to collect stats for " + table, e);
- }
+ // compaction operation, just log a warning.
+ if (logger.isWarnEnabled()) {
+ logger.warn("Unable to collect stats for " + table, e);
+ }
}
+ return internalScanner;
}
- return internalScanner;
- }
- });
+ });
+ }
+ return scanner;
}
private static PTable deserializeTable(byte[] b) {
@@ -1362,23 +1365,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// This will lead to failure of cross cluster RPC if the effective user is not
// the login user. Switch to the login user context to ensure we have the expected
// security context.
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index
- if (request.isMajor()) {
- String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
- try (PhoenixConnection conn =
- QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
- String baseTable = fullTableName;
- PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable);
+ final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+ // since we will make a call to syscat, do nothing if we are compacting syscat itself
+ if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
+ return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
+ @Override
+ public InternalScanner run() throws Exception {
+ // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index
+ try (PhoenixConnection conn =
+ QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
+ PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes();
// FIXME need to handle views and indexes on views as well
for (PTable index : indexes) {
if (index.getIndexDisableTimestamp() != 0) {
logger.info(
"Modifying major compaction scanner to retain deleted cells for a table with disabled index: "
- + baseTable);
+ + fullTableName);
Scan scan = new Scan();
scan.setMaxVersions();
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
@@ -1386,10 +1389,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
HConstants.OLDEST_TIMESTAMP);
}
}
+ } catch (Exception e) {
+ if (e instanceof TableNotFoundException) {
+ logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName);
+ // non-Phoenix HBase tables won't be found, do nothing
+ } else {
+ logger.error("Unable to modify compaction scanner to retain deleted cells for a table with disabled Index; "
+ + fullTableName,
+ e);
+ }
}
+ return s;
}
- return s;
- }
- });
+ });
+ }
+ return s;
}
}