You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/04 18:39:10 UTC

[1/2] phoenix git commit: PHOENIX-4616 Move join query optimization out from QueryCompiler into QueryOptimizer

Repository: phoenix
Updated Branches:
  refs/heads/4.x-cdh5.11 0fa6d947e -> ce3e5867e


PHOENIX-4616 Move join query optimization out from QueryCompiler into QueryOptimizer


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/98a8bbd1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/98a8bbd1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/98a8bbd1

Branch: refs/heads/4.x-cdh5.11
Commit: 98a8bbd1c48badcd5a0e1ad729f9a2d98ebc1e3f
Parents: 0fa6d94
Author: maryannxue <ma...@gmail.com>
Authored: Wed Apr 4 01:16:10 2018 +0100
Committer: Pedro Boado <pb...@apache.org>
Committed: Wed Apr 4 19:25:35 2018 +0100

----------------------------------------------------------------------
 .../apache/phoenix/end2end/join/BaseJoinIT.java |   2 +
 .../apache/phoenix/compile/JoinCompiler.java    | 298 ++++++-------------
 .../apache/phoenix/compile/QueryCompiler.java   |  79 +++--
 .../phoenix/compile/SubselectRewriter.java      |   5 +
 .../apache/phoenix/compile/UpsertCompiler.java  |   2 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   2 +-
 .../GenSubqueryParamValuesRewriter.java         | 153 ++++++++++
 .../apache/phoenix/optimize/QueryOptimizer.java | 172 ++++++++++-
 .../phoenix/compile/QueryCompilerTest.java      | 114 ++++++-
 9 files changed, 561 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java
index 6e03a37..4d4660c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/BaseJoinIT.java
@@ -34,6 +34,7 @@ import java.util.regex.Pattern;
 
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
@@ -456,6 +457,7 @@ public abstract class BaseJoinIT extends ParallelStatsDisabledIT {
 	protected Connection getConnection() throws SQLException {
 		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 		props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true");
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, "true");
 		return DriverManager.getConnection(getUrl(), props);
 	}
 	

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index cf5a5dc..88e8f50 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -40,7 +40,7 @@ import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.expression.function.MinAggregateFunction;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.AliasedNode;
@@ -53,26 +53,19 @@ import org.apache.phoenix.parse.ComparisonParseNode;
 import org.apache.phoenix.parse.ConcreteTableNode;
 import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.EqualParseNode;
-import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
-import org.apache.phoenix.parse.IndexExpressionParseNodeRewriter;
 import org.apache.phoenix.parse.JoinTableNode;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.parse.ParseNodeRewriter;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.parse.TableNodeVisitor;
 import org.apache.phoenix.parse.TableWildcardParseNode;
-import org.apache.phoenix.parse.UDFParseNode;
-import org.apache.phoenix.parse.WildcardParseNode;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.LocalIndexDataColumnRef;
@@ -126,9 +119,8 @@ public class JoinCompiler {
     private final ColumnResolver origResolver;
     private final boolean useStarJoin;
     private final Map<ColumnRef, ColumnRefType> columnRefs;
+    private final Map<ColumnRef, ColumnParseNode> columnNodes;
     private final boolean useSortMergeJoin;
-    private final boolean costBased;
-
 
     private JoinCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) {
         this.statement = statement;
@@ -136,9 +128,8 @@ public class JoinCompiler {
         this.origResolver = resolver;
         this.useStarJoin = !select.getHint().hasHint(Hint.NO_STAR_JOIN);
         this.columnRefs = new HashMap<ColumnRef, ColumnRefType>();
+        this.columnNodes = new HashMap<ColumnRef, ColumnParseNode>();
         this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
-        this.costBased = statement.getConnection().getQueryServices().getProps().getBoolean(
-                QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
     }
 
     public static JoinTable compile(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
@@ -173,6 +164,9 @@ public class JoinCompiler {
             }
         }
 
+        compiler.columnNodes.putAll(joinLocalRefVisitor.getColumnRefMap());
+        compiler.columnNodes.putAll(generalRefVisitor.getColumnRefMap());
+
         for (ColumnRef ref : generalRefVisitor.getColumnRefMap().keySet()) {
             compiler.columnRefs.put(ref, ColumnRefType.GENERAL);
         }
@@ -196,8 +190,8 @@ public class JoinCompiler {
         @Override
         public Pair<Table, List<JoinSpec>> visit(BindTableNode boundTableNode) throws SQLException {
             TableRef tableRef = resolveTable(boundTableNode.getAlias(), boundTableNode.getName());
-            List<AliasedNode> selectNodes = extractFromSelect(select.getSelect(), tableRef, origResolver);
-            Table table = new Table(boundTableNode, Collections.<ColumnDef>emptyList(), boundTableNode.getTableSamplingRate(), selectNodes, tableRef);
+            boolean isWildCard = isWildCardSelectForTable(select.getSelect(), tableRef, origResolver);
+            Table table = new Table(boundTableNode, isWildCard, Collections.<ColumnDef>emptyList(), boundTableNode.getTableSamplingRate(), tableRef);
             return new Pair<Table, List<JoinSpec>>(table, null);
         }
 
@@ -219,8 +213,8 @@ public class JoinCompiler {
         public Pair<Table, List<JoinSpec>> visit(NamedTableNode namedTableNode)
                 throws SQLException {
             TableRef tableRef = resolveTable(namedTableNode.getAlias(), namedTableNode.getName());
-            List<AliasedNode> selectNodes = extractFromSelect(select.getSelect(), tableRef, origResolver);
-            Table table = new Table(namedTableNode, namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate(), selectNodes, tableRef);
+            boolean isWildCard = isWildCardSelectForTable(select.getSelect(), tableRef, origResolver);
+            Table table = new Table(namedTableNode, isWildCard, namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate(), tableRef);
             return new Pair<Table, List<JoinSpec>>(table, null);
         }
 
@@ -228,8 +222,8 @@ public class JoinCompiler {
         public Pair<Table, List<JoinSpec>> visit(DerivedTableNode subselectNode)
                 throws SQLException {
             TableRef tableRef = resolveTable(subselectNode.getAlias(), null);
-            List<AliasedNode> selectNodes = extractFromSelect(select.getSelect(), tableRef, origResolver);
-            Table table = new Table(subselectNode, selectNodes, tableRef);
+            boolean isWildCard = isWildCardSelectForTable(select.getSelect(), tableRef, origResolver);
+            Table table = new Table(subselectNode, isWildCard, tableRef);
             return new Pair<Table, List<JoinSpec>>(table, null);
         }
     }
@@ -666,36 +660,35 @@ public class JoinCompiler {
 
     public class Table {
         private final TableNode tableNode;
+        private final boolean isWildcard;
         private final List<ColumnDef> dynamicColumns;
         private final Double tableSamplingRate;
         private final SelectStatement subselect;
         private final TableRef tableRef;
-        private final List<AliasedNode> selectNodes; // all basic nodes related to this table, no aggregation.
         private final List<ParseNode> preFilters;
         private final List<ParseNode> postFilters;
         private final boolean isPostFilterConvertible;
 
-        private Table(TableNode tableNode, List<ColumnDef> dynamicColumns, Double tableSamplingRate,
-                List<AliasedNode> selectNodes, TableRef tableRef) {
+        private Table(TableNode tableNode, boolean isWildcard, List<ColumnDef> dynamicColumns,
+                      Double tableSamplingRate, TableRef tableRef) {
             this.tableNode = tableNode;
+            this.isWildcard = isWildcard;
             this.dynamicColumns = dynamicColumns;
             this.tableSamplingRate=tableSamplingRate;
             this.subselect = null;
             this.tableRef = tableRef;
-            this.selectNodes = selectNodes;
             this.preFilters = new ArrayList<ParseNode>();
             this.postFilters = Collections.<ParseNode>emptyList();
             this.isPostFilterConvertible = false;
         }
 
-        private Table(DerivedTableNode tableNode,
-                List<AliasedNode> selectNodes, TableRef tableRef) throws SQLException {
+        private Table(DerivedTableNode tableNode, boolean isWildcard, TableRef tableRef) throws SQLException {
             this.tableNode = tableNode;
+            this.isWildcard = isWildcard;
             this.dynamicColumns = Collections.<ColumnDef>emptyList();
             this.tableSamplingRate=ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE;
             this.subselect = SubselectRewriter.flatten(tableNode.getSelect(), statement.getConnection());
             this.tableRef = tableRef;
-            this.selectNodes = selectNodes;
             this.preFilters = new ArrayList<ParseNode>();
             this.postFilters = new ArrayList<ParseNode>();
             this.isPostFilterConvertible = SubselectRewriter.isPostFilterConvertible(subselect);
@@ -717,8 +710,24 @@ public class JoinCompiler {
             return subselect != null;
         }
 
+        /**
+         * Returns all the basic select nodes, no aggregation.
+         */
         public List<AliasedNode> getSelectNodes() {
-            return selectNodes;
+            if (isWildCardSelect()) {
+                return Collections.singletonList(NODE_FACTORY.aliasedNode(null, NODE_FACTORY.wildcard()));
+            }
+
+            List<AliasedNode> ret = new ArrayList<AliasedNode>();
+            for (Map.Entry<ColumnRef, ColumnParseNode> entry : columnNodes.entrySet()) {
+                if (tableRef.equals(entry.getKey().getTableRef())) {
+                    ret.add(NODE_FACTORY.aliasedNode(null, entry.getValue()));
+                }
+            }
+            if (ret.isEmpty()) {
+                ret.add(NODE_FACTORY.aliasedNode(null, NODE_FACTORY.literal(1)));
+            }
+            return ret;
         }
 
         public List<ParseNode> getPreFilters() {
@@ -757,11 +766,61 @@ public class JoinCompiler {
                         tableNode.getAlias(),
                         tableNode);
 
-            return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null,
+            return NODE_FACTORY.select(tableNode, select.getHint(), false, getSelectNodes(), getPreFiltersCombined(), null,
                     null, orderBy, null, null, 0, false, select.hasSequence(),
                     Collections.<SelectStatement> emptyList(), select.getUdfParseNodes());
         }
 
+        public SelectStatement getAsSubqueryForOptimization(boolean applyGroupByOrOrderBy) throws SQLException {
+            assert (!isSubselect());
+
+            SelectStatement query = getAsSubquery(null);
+            if (!applyGroupByOrOrderBy)
+                return query;
+
+            boolean addGroupBy = false;
+            boolean addOrderBy = false;
+            if (select.getGroupBy() != null && !select.getGroupBy().isEmpty()) {
+                ColumnRefParseNodeVisitor groupByVisitor = new ColumnRefParseNodeVisitor(origResolver, statement.getConnection());
+                for (ParseNode node : select.getGroupBy()) {
+                    node.accept(groupByVisitor);
+                }
+                Set<TableRef> set = groupByVisitor.getTableRefSet();
+                if (set.size() == 1 && tableRef.equals(set.iterator().next())) {
+                    addGroupBy = true;
+                }
+            } else if (select.getOrderBy() != null && !select.getOrderBy().isEmpty()) {
+                ColumnRefParseNodeVisitor orderByVisitor = new ColumnRefParseNodeVisitor(origResolver, statement.getConnection());
+                for (OrderByNode node : select.getOrderBy()) {
+                    node.getNode().accept(orderByVisitor);
+                }
+                Set<TableRef> set = orderByVisitor.getTableRefSet();
+                if (set.size() == 1 && tableRef.equals(set.iterator().next())) {
+                    addOrderBy = true;
+                }
+            }
+
+            if (!addGroupBy && !addOrderBy)
+                return query;
+
+            List<AliasedNode> selectList = query.getSelect();
+            if (addGroupBy) {
+                assert (!isWildCardSelect());
+                selectList = new ArrayList<AliasedNode>(query.getSelect().size());
+                for (AliasedNode aliasedNode : query.getSelect()) {
+                    ParseNode node = NODE_FACTORY.function(
+                            MinAggregateFunction.NAME, Collections.singletonList(aliasedNode.getNode()));
+                    selectList.add(NODE_FACTORY.aliasedNode(null, node));
+                }
+            }
+
+            return NODE_FACTORY.select(query.getFrom(), query.getHint(), query.isDistinct(), selectList,
+                    query.getWhere(), addGroupBy ? select.getGroupBy() : query.getGroupBy(),
+                    addGroupBy ? null : query.getHaving(), addOrderBy ? select.getOrderBy() : query.getOrderBy(),
+                    query.getLimit(), query.getOffset(), query.getBindCount(), addGroupBy, query.hasSequence(),
+                    query.getSelects(), query.getUdfParseNodes());
+        }
+
         public boolean hasFilters() {
             return isSubselect() ? (!postFilters.isEmpty() || subselect.getWhere() != null || subselect.getHaving() != null) : !preFilters.isEmpty();
         }
@@ -771,7 +830,7 @@ public class JoinCompiler {
         }
 
         protected boolean isWildCardSelect() {
-            return (selectNodes.size() == 1 && selectNodes.get(0).getNode() instanceof TableWildcardParseNode);
+            return isWildcard;
         }
 
         public void projectColumns(Scan scan) {
@@ -1154,35 +1213,19 @@ public class JoinCompiler {
         return NODE_FACTORY.and(nodes);
     }
 
-    private List<AliasedNode> extractFromSelect(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException {
-        List<AliasedNode> ret = new ArrayList<AliasedNode>();
+    private boolean isWildCardSelectForTable(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException {
         ColumnRefParseNodeVisitor visitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
         for (AliasedNode aliasedNode : select) {
             ParseNode node = aliasedNode.getNode();
             if (node instanceof TableWildcardParseNode) {
                 TableName tableName = ((TableWildcardParseNode) node).getTableName();
                 if (tableRef.equals(resolver.resolveTable(tableName.getSchemaName(), tableName.getTableName()))) {
-                    ret.clear();
-                    ret.add(aliasedNode);
-                    return ret;
+                    return true;
                 }
-                continue;
-            }
 
-            node.accept(visitor);
-            ColumnRefParseNodeVisitor.ColumnRefType type = visitor.getContentType(Collections.singletonList(tableRef));
-            if (type == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
-                ret.add(aliasedNode);
-            } else if (type == ColumnRefParseNodeVisitor.ColumnRefType.COMPLEX) {
-                for (Map.Entry<ColumnRef, ColumnParseNode> entry : visitor.getColumnRefMap().entrySet()) {
-                    if (entry.getKey().getTableRef().equals(tableRef)) {
-                        ret.add(NODE_FACTORY.aliasedNode(null, entry.getValue()));
-                    }
-                }
             }
-            visitor.reset();
         }
-        return ret;
+        return false;
     }
 
     private static Expression compilePostFilterExpression(StatementContext context, List<ParseNode> postFilters) throws SQLException {
@@ -1203,167 +1246,6 @@ public class JoinCompiler {
         return AndExpression.create(expressions);
     }
 
-    public static Pair<SelectStatement, Map<TableRef, QueryPlan>> optimize(
-            PhoenixStatement statement, SelectStatement select, final ColumnResolver resolver) throws SQLException {
-        TableRef groupByTableRef = null;
-        TableRef orderByTableRef = null;
-        if (select.getGroupBy() != null && !select.getGroupBy().isEmpty()) {
-            ColumnRefParseNodeVisitor groupByVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
-            for (ParseNode node : select.getGroupBy()) {
-                node.accept(groupByVisitor);
-            }
-            Set<TableRef> set = groupByVisitor.getTableRefSet();
-            if (set.size() == 1) {
-                groupByTableRef = set.iterator().next();
-            }
-        } else if (select.getOrderBy() != null && !select.getOrderBy().isEmpty()) {
-            ColumnRefParseNodeVisitor orderByVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
-            for (OrderByNode node : select.getOrderBy()) {
-                node.getNode().accept(orderByVisitor);
-            }
-            Set<TableRef> set = orderByVisitor.getTableRefSet();
-            if (set.size() == 1) {
-                orderByTableRef = set.iterator().next();
-            }
-        }
-        JoinTable join = compile(statement, select, resolver);
-        if (groupByTableRef != null || orderByTableRef != null) {
-            QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false, null);
-            List<Object> binds = statement.getParameters();
-            StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
-            QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null, Collections.<TableRef, QueryPlan>emptyMap());
-            TableRef table = plan.getTableRef();
-            if (groupByTableRef != null && !groupByTableRef.equals(table)) {
-                groupByTableRef = null;
-            }
-            if (orderByTableRef != null && !orderByTableRef.equals(table)) {
-                orderByTableRef = null;
-            }
-        }
-
-        Map<TableRef, TableRef> replacementMap = null;
-        Map<TableRef, QueryPlan> dataPlanMap = null;
-
-        for (Table table : join.getTables()) {
-            if (table.isSubselect())
-                continue;
-            TableRef tableRef = table.getTableRef();
-            List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null;
-            List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null;
-            SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), table.getTableSamplingRate(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes());
-            // TODO: It seems inefficient to be recompiling the statement again and again inside of this optimize call
-            QueryPlan dataPlan =
-                    new QueryCompiler(
-                            statement, stmt,
-                            FromCompiler.getResolverForQuery(stmt, statement.getConnection()),
-                            false, null)
-                    .compile();
-            QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, dataPlan);
-            TableRef newTableRef = plan.getTableRef();
-            if (!newTableRef.equals(tableRef)) {
-                if (replacementMap == null) {
-                    replacementMap = new HashMap<TableRef, TableRef>();
-                    dataPlanMap = new HashMap<TableRef, QueryPlan>();
-                }
-                replacementMap.put(tableRef, newTableRef);
-                dataPlanMap.put(newTableRef, dataPlan);
-            }
-        }
-
-        if (replacementMap == null)
-            return new Pair<SelectStatement, Map<TableRef, QueryPlan>>(
-                    select, Collections.<TableRef, QueryPlan> emptyMap());
-
-        final Map<TableRef, TableRef> replacement = replacementMap;
-        TableNode from = select.getFrom();
-        TableNode newFrom = from.accept(new TableNodeVisitor<TableNode>() {
-            private TableRef resolveTable(String alias, TableName name) throws SQLException {
-                if (alias != null)
-                    return resolver.resolveTable(null, alias);
-
-                return resolver.resolveTable(name.getSchemaName(), name.getTableName());
-            }
-
-            private TableName getReplacedTableName(TableRef tableRef) {
-                String schemaName = tableRef.getTable().getSchemaName().getString();
-                return TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString());
-            }
-
-            @Override
-            public TableNode visit(BindTableNode boundTableNode) throws SQLException {
-                TableRef tableRef = resolveTable(boundTableNode.getAlias(), boundTableNode.getName());
-                TableRef replaceRef = replacement.get(tableRef);
-                if (replaceRef == null)
-                    return boundTableNode;
-
-                String alias = boundTableNode.getAlias();
-                return NODE_FACTORY.bindTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef));
-            }
-
-            @Override
-            public TableNode visit(JoinTableNode joinNode) throws SQLException {
-                TableNode lhs = joinNode.getLHS();
-                TableNode rhs = joinNode.getRHS();
-                TableNode lhsReplace = lhs.accept(this);
-                TableNode rhsReplace = rhs.accept(this);
-                if (lhs == lhsReplace && rhs == rhsReplace)
-                    return joinNode;
-
-                return NODE_FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode(), joinNode.isSingleValueOnly());
-            }
-
-            @Override
-            public TableNode visit(NamedTableNode namedTableNode)
-                    throws SQLException {
-                TableRef tableRef = resolveTable(namedTableNode.getAlias(), namedTableNode.getName());
-                TableRef replaceRef = replacement.get(tableRef);
-                if (replaceRef == null)
-                    return namedTableNode;
-
-                String alias = namedTableNode.getAlias();
-                return NODE_FACTORY.namedTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef), namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate());
-            }
-
-            @Override
-            public TableNode visit(DerivedTableNode subselectNode)
-                    throws SQLException {
-                return subselectNode;
-            }
-        });
-
-        SelectStatement indexSelect = IndexStatementRewriter.translate(NODE_FACTORY.select(select, newFrom), resolver, replacement);
-        for ( TableRef indexTableRef : replacement.values()) {
-            // replace expressions with corresponding matching columns for functional indexes
-            indexSelect = ParseNodeRewriter.rewrite(indexSelect, new  IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), statement.getConnection(), indexSelect.getUdfParseNodes()));
-        } 
-        return new Pair<SelectStatement, Map<TableRef, QueryPlan>>(indexSelect, dataPlanMap);
-    }
-
-    private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, Double tableSamplingRate, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy,
-            List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence, Map<String, UDFParseNode> udfParseNodes) {
-        String schemaName = tableRef.getTable().getSchemaName().getString();
-        TableName tName = TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString());
-        List<AliasedNode> selectList = new ArrayList<AliasedNode>();
-        if (isWildCardSelect) {
-            selectList.add(NODE_FACTORY.aliasedNode(null, WildcardParseNode.INSTANCE));
-        } else {
-            for (ColumnRef colRef : columnRefs.keySet()) {
-                if (colRef.getTableRef().equals(tableRef)) {
-                    ParseNode node = NODE_FACTORY.column(tName, '"' + colRef.getColumn().getName().getString() + '"', null);
-                    if (groupBy != null) {
-                        node = NODE_FACTORY.function(CountAggregateFunction.NAME, Collections.singletonList(node));
-                    }
-                    selectList.add(NODE_FACTORY.aliasedNode(null, node));
-                }
-            }
-        }
-        String tableAlias = tableRef.getTableAlias();
-        TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols,tableSamplingRate);
-
-        return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, null, 0,
-                groupBy != null, hasSequence, Collections.<SelectStatement> emptyList(), udfParseNodes);
-    }
-
     public static PTable joinProjectedTables(PTable left, PTable right, JoinType type) throws SQLException {
         Preconditions.checkArgument(left.getType() == PTableType.PROJECTED);
         Preconditions.checkArgument(right.getType() == PTableType.PROJECTED);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 729e439..9568ad8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -108,14 +108,15 @@ public class QueryCompiler {
     private final SequenceManager sequenceManager;
     private final boolean projectTuples;
     private final boolean noChildParentJoinOptimization;
-    private final QueryPlan dataPlan;
+    private final boolean optimizeSubquery;
+    private final Map<TableRef, QueryPlan> dataPlans;
     private final boolean costBased;
 
-    public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
-        this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan);
+    public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
+        this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, optimizeSubquery, dataPlans);
     }
 
-    public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
+    public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
         this.statement = statement;
         this.select = select;
         this.resolver = resolver;
@@ -136,11 +137,12 @@ public class QueryCompiler {
 
         scan.setCaching(statement.getFetchSize());
         this.originalScan = ScanUtil.newScan(scan);
-        this.dataPlan = dataPlan;
+        this.optimizeSubquery = optimizeSubquery;
+        this.dataPlans = dataPlans == null ? Collections.<TableRef, QueryPlan>emptyMap() : dataPlans;
     }
 
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException {
-        this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true, null);
+        this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true, false, null);
     }
 
     /**
@@ -184,7 +186,7 @@ public class QueryCompiler {
             select.hasWildcard() ? null : select.getSelect());
         ColumnResolver resolver = FromCompiler.getResolver(tableRef);
         StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
-        QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false, null);
+        QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false);
         plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(),
             plan.getOffset(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans,
             context.getBindManager().getParameterMetaData());
@@ -195,18 +197,10 @@ public class QueryCompiler {
         List<Object> binds = statement.getParameters();
         StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
         if (select.isJoin()) {
-            Pair<SelectStatement, Map<TableRef, QueryPlan>> optimized =
-                    JoinCompiler.optimize(statement, select, resolver);
-            SelectStatement optimizedSelect = optimized.getFirst();
-            if (select != optimizedSelect) {
-                ColumnResolver resolver = FromCompiler.getResolverForQuery(optimizedSelect, statement.getConnection());
-                context = new StatementContext(statement, resolver, scan, sequenceManager);
-            }
-            JoinTable joinTable = JoinCompiler.compile(statement, optimizedSelect, context.getResolver());
-            return compileJoinQuery(
-                    context, binds, joinTable, false, false, null, optimized.getSecond());
+            JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
+            return compileJoinQuery(context, binds, joinTable, false, false, null);
         } else {
-            return compileSingleQuery(context, select, binds, false, true, dataPlan);
+            return compileSingleQuery(context, select, binds, false, true);
         }
     }
 
@@ -219,7 +213,7 @@ public class QueryCompiler {
      *      2) Otherwise, return the join plan compiled with the default strategy.
      * @see JoinCompiler.JoinTable#getApplicableJoinStrategies()
      */
-    protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
+    protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
         if (joinTable.getJoinSpecs().isEmpty()) {
             Table table = joinTable.getTable();
             SelectStatement subquery = table.getAsSubquery(orderBy);
@@ -230,8 +224,7 @@ public class QueryCompiler {
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), projector);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
                 table.projectColumns(context.getScan());
-                QueryPlan dataPlan = dataPlans.get(table.getTableRef());
-                return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true, dataPlan);
+                return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true);
             }
             QueryPlan plan = compileSubquery(subquery, false);
             PTable projectedTable = table.createProjectedTable(plan.getProjector());
@@ -243,7 +236,7 @@ public class QueryCompiler {
         assert strategies.size() > 0;
         if (!costBased || strategies.size() == 1) {
             return compileJoinQuery(
-                    strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy, dataPlans);
+                    strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy);
         }
 
         QueryPlan bestPlan = null;
@@ -252,7 +245,7 @@ public class QueryCompiler {
             StatementContext newContext = new StatementContext(
                     context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager());
             QueryPlan plan = compileJoinQuery(
-                    strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy, dataPlans);
+                    strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy);
             Cost cost = plan.getCost();
             if (bestPlan == null || cost.compareTo(bestCost) < 0) {
                 bestPlan = plan;
@@ -264,7 +257,7 @@ public class QueryCompiler {
         return bestPlan;
     }
 
-    protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy, Map<TableRef, QueryPlan> dataPlans) throws SQLException {
+    protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
         byte[] emptyByteArray = new byte[0];
         List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
         switch (strategy) {
@@ -307,7 +300,7 @@ public class QueryCompiler {
                     JoinSpec joinSpec = joinSpecs.get(i);
                     Scan subScan = ScanUtil.newScan(originalScan);
                     subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                    subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null, dataPlans);
+                    subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
                     boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
                     if (hasPostReference) {
                         tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
@@ -334,8 +327,7 @@ public class QueryCompiler {
                     hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
                 }
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
-                QueryPlan dataPlan = dataPlans.get(tableRef);
-                QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true, dataPlan);
+                QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
                 Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
                 Integer limit = null;
                 Integer offset = null;
@@ -355,7 +347,7 @@ public class QueryCompiler {
                 JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
                 Scan subScan = ScanUtil.newScan(originalScan);
                 StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null, dataPlans);
+                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
                 PTable rhsProjTable;
                 TableRef rhsTableRef;
                 SelectStatement rhs;
@@ -388,8 +380,7 @@ public class QueryCompiler {
                 PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
-                QueryPlan dataPlan = dataPlans.get(rhsTableRef);
-                QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true, dataPlan);
+                QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
                 Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
                 Integer limit = null;
                 Integer offset = null;
@@ -426,13 +417,13 @@ public class QueryCompiler {
                 Scan lhsScan = ScanUtil.newScan(originalScan);
                 StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
                 boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
-                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy, dataPlans);
+                QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
                 PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
                 boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
 
                 Scan rhsScan = ScanUtil.newScan(originalScan);
                 StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
-                QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy, dataPlans);
+                QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
                 PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
 
                 Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy);
@@ -459,7 +450,7 @@ public class QueryCompiler {
                         joinTable.getStatement().getUdfParseNodes())
                         : NODE_FACTORY.select(joinTable.getStatement(), from, where);
 
-                return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder, null);
+                return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
             }
             default:
                 throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
@@ -512,16 +503,18 @@ public class QueryCompiler {
         }
         int maxRows = this.statement.getMaxRows();
         this.statement.setMaxRows(pushDownMaxRows ? maxRows : 0); // overwrite maxRows to avoid its impact on inner queries.
-        QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, null).compile();
-        plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+        QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, optimizeSubquery, null).compile();
+        if (optimizeSubquery) {
+            plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+        }
         this.statement.setMaxRows(maxRows); // restore maxRows.
         return plan;
     }
 
-    protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan dataPlan) throws SQLException{
+    protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
         SelectStatement innerSelect = select.getInnerSelectStatement();
         if (innerSelect == null) {
-            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true, dataPlan);
+            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true);
         }
 
         QueryPlan innerPlan = compileSubquery(innerSelect, false);
@@ -536,10 +529,10 @@ public class QueryCompiler {
         context.setCurrentTable(tableRef);
         boolean isInRowKeyOrder = innerPlan.getGroupBy() == GroupBy.EMPTY_GROUP_BY && innerPlan.getOrderBy() == OrderBy.EMPTY_ORDER_BY;
 
-        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder, null);
+        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder);
     }
 
-    protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder, QueryPlan dataPlan) throws SQLException{
+    protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder) throws SQLException{
         PTable projectedTable = null;
         if (this.projectTuples) {
             projectedTable = TupleProjectionCompiler.createProjectedTable(select, context);
@@ -596,6 +589,7 @@ public class QueryCompiler {
         }
         
         QueryPlan plan = innerPlan;
+        QueryPlan dataPlan = dataPlans.get(tableRef);
         if (plan == null) {
             ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory;
             plan = select.getFrom() == null
@@ -607,6 +601,7 @@ public class QueryCompiler {
                             : new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy,
                                     parallelIteratorFactory, allowPageFilter, dataPlan));
         }
+        SelectStatement planSelect = asSubquery ? select : this.select;
         if (!subqueries.isEmpty()) {
             int count = subqueries.size();
             WhereClauseSubPlan[] subPlans = new WhereClauseSubPlan[count];
@@ -615,7 +610,7 @@ public class QueryCompiler {
                 SelectStatement stmt = subqueryNode.getSelectNode();
                 subPlans[i++] = new WhereClauseSubPlan(compileSubquery(stmt, false), stmt, subqueryNode.expectSingleRow());
             }
-            plan = HashJoinPlan.create(select, plan, null, subPlans);
+            plan = HashJoinPlan.create(planSelect, plan, null, subPlans);
         }
 
         if (innerPlan != null) {
@@ -623,9 +618,9 @@ public class QueryCompiler {
                 where = null; // we do not pass "true" as filter
             }
             plan = select.isAggregate() || select.isDistinct()
-                    ? new ClientAggregatePlan(context, select, tableRef, projector, limit, offset, where, orderBy,
+                    ? new ClientAggregatePlan(context, planSelect, tableRef, projector, limit, offset, where, orderBy,
                             groupBy, having, plan)
-                    : new ClientScanPlan(context, select, tableRef, projector, limit, offset, where, orderBy, plan);
+                    : new ClientScanPlan(context, planSelect, tableRef, projector, limit, offset, where, orderBy, plan);
 
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index a926e06..af19ed1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -47,6 +47,11 @@ import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.collect.Lists;
 
+/*
+ * Class for flattening derived-tables when possible. A derived-table can be
+ * flattened if the merged statement preserves the same semantics as the original
+ * statement.
+ */
 public class SubselectRewriter extends ParseNodeRewriter {
     private final String tableAlias;
     private final Map<String, ParseNode> aliasMap;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 92e3b8a..1a9a686 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -549,7 +549,7 @@ public class UpsertCompiler {
             select = SelectStatement.create(select, hint);
             // Pass scan through if same table in upsert and select so that projection is computed correctly
             // Use optimizer to choose the best plan
-            QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false, null);
+            QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false, false, null);
             queryPlanToBe = compiler.compile();
             // This is post-fix: if the tableRef is a projected table, this means there are post-processing
             // steps and parallelIteratorFactory did not take effect.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 1058400..4a692d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -477,7 +477,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 select = StatementNormalizer.normalize(transformedSelect, resolver);
             }
 
-            QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true, null).compile();
+            QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true, false, null).compile();
             plan.getContext().getSequenceManager().validateSequences(seqAction);
             return plan;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/optimize/GenSubqueryParamValuesRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/GenSubqueryParamValuesRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/GenSubqueryParamValuesRewriter.java
new file mode 100644
index 0000000..567e92e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/GenSubqueryParamValuesRewriter.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.optimize;
+
+import org.apache.phoenix.compile.ExpressionCompiler;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.ArrayAllComparisonNode;
+import org.apache.phoenix.parse.ArrayAnyComparisonNode;
+import org.apache.phoenix.parse.ComparisonParseNode;
+import org.apache.phoenix.parse.ExistsParseNode;
+import org.apache.phoenix.parse.InParseNode;
+import org.apache.phoenix.parse.OrParseNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.ParseNodeRewriter;
+import org.apache.phoenix.parse.SubqueryParseNode;
+import org.apache.phoenix.schema.types.PDataType;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Creates a new WHERE clause by replaces non-correlated sub-queries with dummy values.
+ *
+ * Note that this class does not check the presence of correlation, thus it should only
+ * be used after de-correlation has been performed.
+ */
+public class GenSubqueryParamValuesRewriter extends ParseNodeRewriter {
+    private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
+
+    private final ExpressionCompiler expressionCompiler;
+
+    public static ParseNode replaceWithDummyValues(
+            ParseNode where, StatementContext context) throws SQLException {
+        return rewrite(where, new GenSubqueryParamValuesRewriter(context));
+    }
+
+    private GenSubqueryParamValuesRewriter(StatementContext context) {
+        this.expressionCompiler = new ExpressionCompiler(context);
+    }
+
+    protected List<ParseNode> generateDummyValues(
+            ParseNode lhs, boolean multipleValues) throws SQLException {
+        Expression expr = lhs.accept(expressionCompiler);
+        PDataType type = expr.getDataType();
+        if (!multipleValues) {
+            return Arrays.<ParseNode> asList(NODE_FACTORY.literal(type.getSampleValue(), type));
+        }
+
+        return Arrays.<ParseNode> asList(
+                NODE_FACTORY.literal(type.getSampleValue(), type),
+                NODE_FACTORY.literal(type.getSampleValue(), type),
+                NODE_FACTORY.literal(type.getSampleValue(), type));
+    }
+
+    @Override
+    public ParseNode visitLeave(AndParseNode node, List<ParseNode> l) throws SQLException {
+        return leaveCompoundNode(node, l, new CompoundNodeFactory() {
+            @Override
+            public ParseNode createNode(List<ParseNode> children) {
+                if (children.isEmpty()) {
+                    return null;
+                }
+                if (children.size() == 1) {
+                    return children.get(0);
+                }
+                return NODE_FACTORY.and(children);
+            }
+        });
+    }
+
+    @Override
+    public ParseNode visitLeave(OrParseNode node, List<ParseNode> l) throws SQLException {
+        return leaveCompoundNode(node, l, new CompoundNodeFactory() {
+            @Override
+            public ParseNode createNode(List<ParseNode> children) {
+                if (children.isEmpty()) {
+                    return null;
+                }
+                if (children.size() == 1) {
+                    return children.get(0);
+                }
+                return NODE_FACTORY.or(children);
+            }
+        });
+    }
+
+    @Override
+    public ParseNode visitLeave(InParseNode node, List<ParseNode> l) throws SQLException {
+        ParseNode lhs = l.get(0);
+        List<ParseNode> inList = generateDummyValues(lhs, true);
+        List<ParseNode> children = new ArrayList<ParseNode>();
+        children.add(lhs);
+        children.addAll(inList);
+        return NODE_FACTORY.inList(children, node.isNegate());
+    }
+
+    @Override
+    public ParseNode visitLeave(ExistsParseNode node, List<ParseNode> l) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public ParseNode visitLeave(ComparisonParseNode node, List<ParseNode> l) throws SQLException {
+        if (!(l.get(1) instanceof SubqueryParseNode)) {
+            super.visitLeave(node, l);
+        }
+
+        ParseNode lhs = l.get(0);
+        List<ParseNode> rhs = generateDummyValues(lhs, false);
+        List<ParseNode> children = new ArrayList<ParseNode>();
+        children.add(lhs);
+        children.add(rhs.get(0));
+        return super.visitLeave(node, children);
+    }
+
+    @Override
+    public ParseNode visitLeave(ArrayAnyComparisonNode node, List<ParseNode> l) throws SQLException {
+        ComparisonParseNode compare = (ComparisonParseNode) l.get(1);
+        ParseNode lhs = compare.getLHS();
+        List<ParseNode> rhs = generateDummyValues(lhs, false);
+
+        return NODE_FACTORY.comparison(compare.getFilterOp(), lhs, rhs.get(0));
+    }
+
+    @Override
+    public ParseNode visitLeave(ArrayAllComparisonNode node, List<ParseNode> l) throws SQLException {
+        ComparisonParseNode compare = (ComparisonParseNode) l.get(1);
+        ParseNode lhs = compare.getLHS();
+        List<ParseNode> rhs = generateDummyValues(lhs, false);
+
+        return NODE_FACTORY.comparison(compare.getFilterOp(), lhs, rhs.get(0));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 8481bc5..31f5c34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -21,33 +21,45 @@ package org.apache.phoenix.optimize;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.ExpressionCompiler;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexStatementRewriter;
+import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementNormalizer;
 import org.apache.phoenix.compile.SubqueryRewriter;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.BindTableNode;
 import org.apache.phoenix.parse.BooleanParseNodeVisitor;
 import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.IndexExpressionParseNodeRewriter;
+import org.apache.phoenix.parse.JoinTableNode;
+import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.ParseNodeRewriter;
 import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.parse.TableNodeVisitor;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -59,7 +71,6 @@ import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 
 import com.google.common.collect.Lists;
@@ -70,7 +81,7 @@ public class QueryOptimizer {
     private final QueryServices services;
     private final boolean useIndexes;
     private final boolean costBased;
-    private long indexPendingDisabledThreshold;
+    private final long indexPendingDisabledThreshold;
 
     public QueryOptimizer(QueryServices services) {
         this.services = services;
@@ -111,18 +122,81 @@ public class QueryOptimizer {
     }
     
     private List<QueryPlan> getApplicablePlans(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, boolean stopAtBestPlan) throws SQLException {
+        if (!useIndexes) {
+            return Collections.singletonList(dataPlan);
+        }
+
+        if (dataPlan instanceof BaseQueryPlan) {
+            return getApplicablePlans((BaseQueryPlan) dataPlan, statement, targetColumns, parallelIteratorFactory, stopAtBestPlan);
+        }
+
+        SelectStatement select = (SelectStatement) dataPlan.getStatement();
+        ColumnResolver resolver = FromCompiler.getResolverForQuery(select, statement.getConnection());
+        Map<TableRef, QueryPlan> dataPlans = null;
+
+        // Find the optimal index plan for each join tables in a join query or a
+        // non-correlated sub-query, then rewrite the query with found index tables.
+        if (select.isJoin()
+                || (select.getWhere() != null && select.getWhere().hasSubquery())) {
+            JoinCompiler.JoinTable join = JoinCompiler.compile(statement, select, resolver);
+            Map<TableRef, TableRef> replacement = null;
+            for (JoinCompiler.Table table : join.getTables()) {
+                if (table.isSubselect())
+                    continue;
+                TableRef tableRef = table.getTableRef();
+                SelectStatement stmt = table.getAsSubqueryForOptimization(tableRef.equals(dataPlan.getTableRef()));
+                // Replace non-correlated sub-queries in WHERE clause with dummy values
+                // so the filter conditions can be taken into account in optimization.
+                if (stmt.getWhere() != null && stmt.getWhere().hasSubquery()) {
+                    StatementContext context =
+                            new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));;
+                    ParseNode dummyWhere = GenSubqueryParamValuesRewriter.replaceWithDummyValues(stmt.getWhere(), context);
+                    stmt = FACTORY.select(stmt, dummyWhere);
+                }
+                // TODO: It seems inefficient to be recompiling the statement again inside of this optimize call
+                QueryPlan subDataPlan =
+                        new QueryCompiler(
+                                statement, stmt,
+                                FromCompiler.getResolverForQuery(stmt, statement.getConnection()),
+                                false, false, null)
+                                .compile();
+                QueryPlan subPlan = optimize(statement, subDataPlan);
+                TableRef newTableRef = subPlan.getTableRef();
+                if (!newTableRef.equals(tableRef)) {
+                    if (replacement == null) {
+                        replacement = new HashMap<TableRef, TableRef>();
+                        dataPlans = new HashMap<TableRef, QueryPlan>();
+                    }
+                    replacement.put(tableRef, newTableRef);
+                    dataPlans.put(newTableRef, subDataPlan);
+                }
+            }
+
+            if (replacement != null) {
+                select = rewriteQueryWithIndexReplacement(
+                        statement.getConnection(), resolver, select, replacement);
+                resolver = FromCompiler.getResolverForQuery(select, statement.getConnection());
+            }
+        }
+
+        // Re-compile the plan with option "optimizeSubquery" turned on, so that enclosed
+        // sub-queries can be optimized recursively.
+        QueryCompiler compiler = new QueryCompiler(statement, select, resolver,
+                targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(),
+                true, true, dataPlans);
+        return Collections.singletonList(compiler.compile());
+    }
+
+    private List<QueryPlan> getApplicablePlans(BaseQueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, boolean stopAtBestPlan) throws SQLException {
         SelectStatement select = (SelectStatement)dataPlan.getStatement();
         // Exit early if we have a point lookup as we can't get better than that
-        if (!useIndexes 
-                || (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan)) {
-            return Collections.singletonList(dataPlan);
+        if (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan) {
+            return Collections.<QueryPlan> singletonList(dataPlan);
         }
-        // For single query tuple projection, indexes are inherited from the original table to the projected
-        // table; otherwise not. So we pass projected table here, which is enough to tell if this is from a
-        // single query or a part of join query.
-        List<PTable>indexes = Lists.newArrayList(dataPlan.getContext().getResolver().getTables().get(0).getTable().getIndexes());
+
+        List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes());
         if (indexes.isEmpty() || dataPlan.isDegenerate() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX)) {
-            return Collections.singletonList(dataPlan);
+            return Collections.<QueryPlan> singletonList(dataPlan);
         }
         
         // The targetColumns is set for UPSERT SELECT to ensure that the proper type conversion takes place.
@@ -237,12 +311,13 @@ public class QueryOptimizer {
         TableRef indexTableRef = resolver.getTables().get(0);
         PTable indexTable = indexTableRef.getTable();
         PIndexState indexState = indexTable.getIndexState();
+        Map<TableRef, QueryPlan> dataPlans = Collections.singletonMap(indexTableRef, dataPlan);
         if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE
                 || (indexState == PIndexState.PENDING_DISABLE && isUnderPendingDisableThreshold(indexTableRef.getCurrentTime(), indexTable.getIndexDisableTimestamp()))) {
             try {
             	// translate nodes that match expressions that are indexed to the associated column parse node
                 indexSelect = ParseNodeRewriter.rewrite(indexSelect, new  IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes()));
-                QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan);
+                QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, true, dataPlans);
                 
                 QueryPlan plan = compiler.compile();
                 // If query doesn't have where clause and some of columns to project are missing
@@ -314,7 +389,7 @@ public class QueryOptimizer {
                         query = SubqueryRewriter.transform(query, queryResolver, statement.getConnection());
                         queryResolver = FromCompiler.getResolverForQuery(query, statement.getConnection());
                         query = StatementNormalizer.normalize(query, queryResolver);
-                        QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan).compile();
+                        QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, true, dataPlans).compile();
                         return plan;
                     }
                 }
@@ -552,5 +627,76 @@ public class QueryOptimizer {
             return node;
         }
     }
-    
+
+    private static SelectStatement rewriteQueryWithIndexReplacement(
+            final PhoenixConnection connection, final ColumnResolver resolver,
+            final SelectStatement select, final Map<TableRef, TableRef> replacement) throws SQLException {
+        TableNode from = select.getFrom();
+        TableNode newFrom = from.accept(new TableNodeVisitor<TableNode>() {
+            private TableRef resolveTable(String alias, TableName name) throws SQLException {
+                if (alias != null)
+                    return resolver.resolveTable(null, alias);
+
+                return resolver.resolveTable(name.getSchemaName(), name.getTableName());
+            }
+
+            private TableName getReplacedTableName(TableRef tableRef) {
+                String schemaName = tableRef.getTable().getSchemaName().getString();
+                return TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString());
+            }
+
+            @Override
+            public TableNode visit(BindTableNode boundTableNode) throws SQLException {
+                TableRef tableRef = resolveTable(boundTableNode.getAlias(), boundTableNode.getName());
+                TableRef replaceRef = replacement.get(tableRef);
+                if (replaceRef == null)
+                    return boundTableNode;
+
+                String alias = boundTableNode.getAlias();
+                return FACTORY.bindTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef));
+            }
+
+            @Override
+            public TableNode visit(JoinTableNode joinNode) throws SQLException {
+                TableNode lhs = joinNode.getLHS();
+                TableNode rhs = joinNode.getRHS();
+                TableNode lhsReplace = lhs.accept(this);
+                TableNode rhsReplace = rhs.accept(this);
+                if (lhs == lhsReplace && rhs == rhsReplace)
+                    return joinNode;
+
+                return FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode(), joinNode.isSingleValueOnly());
+            }
+
+            @Override
+            public TableNode visit(NamedTableNode namedTableNode)
+                    throws SQLException {
+                TableRef tableRef = resolveTable(namedTableNode.getAlias(), namedTableNode.getName());
+                TableRef replaceRef = replacement.get(tableRef);
+                if (replaceRef == null)
+                    return namedTableNode;
+
+                String alias = namedTableNode.getAlias();
+                return FACTORY.namedTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef), namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate());
+            }
+
+            @Override
+            public TableNode visit(DerivedTableNode subselectNode)
+                    throws SQLException {
+                return subselectNode;
+            }
+        });
+
+        if (from == newFrom) {
+            return select;
+        }
+
+        SelectStatement indexSelect = IndexStatementRewriter.translate(FACTORY.select(select, newFrom), resolver, replacement);
+        for (TableRef indexTableRef : replacement.values()) {
+            // replace expressions with corresponding matching columns for functional indexes
+            indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), connection, indexSelect.getUdfParseNodes()));
+        }
+
+        return indexSelect;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98a8bbd1/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 11f5f22..bac4ee8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -42,6 +42,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
@@ -83,7 +84,9 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PInteger;
@@ -4309,7 +4312,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             assertEquals(e.getErrorCode(), SQLExceptionCode.CONNECTION_CLOSED.getErrorCode());
         }
     }
-    
+
     @Test
     public void testSingleColLocalIndexPruning() throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -4656,6 +4659,115 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
         }
     }
 
+    @Test
+    public void testQueryPlanSourceRefsInHashJoin() throws SQLException {
+        String query = "SELECT * FROM (\n" +
+                "    SELECT K1, V1 FROM A WHERE V1 = 'A'\n" +
+                ") T1 JOIN (\n" +
+                "    SELECT K2, V2 FROM B WHERE V2 = 'B'\n" +
+                ") T2 ON K1 = K2 ORDER BY V1";
+        verifyQueryPlanSourceRefs(query, 2);
+    }
+
+    @Test
+    public void testQueryPlanSourceRefsInSortMergeJoin() throws SQLException {
+        String query = "SELECT * FROM (\n" +
+                "    SELECT max(K1) KEY1, V1 FROM A GROUP BY V1\n" +
+                ") T1 JOIN (\n" +
+                "    SELECT max(K2) KEY2, V2 FROM B GROUP BY V2\n" +
+                ") T2 ON KEY1 = KEY2 ORDER BY V1";
+        verifyQueryPlanSourceRefs(query, 2);
+    }
+
+    @Test
+    public void testQueryPlanSourceRefsInSubquery() throws SQLException {
+        String query = "SELECT * FROM A\n" +
+                "WHERE K1 > (\n" +
+                "    SELECT max(K2) FROM B WHERE V2 = V1\n" +
+                ") ORDER BY V1";
+        verifyQueryPlanSourceRefs(query, 2);
+    }
+
+    @Test
+    public void testQueryPlanSourceRefsInSubquery2() throws SQLException {
+        String query = "SELECT * FROM A\n" +
+                "WHERE V1 > ANY (\n" +
+                "    SELECT K2 FROM B WHERE V2 = 'B'\n" +
+                ")";
+        verifyQueryPlanSourceRefs(query, 2);
+    }
+
+    @Test
+    public void testQueryPlanSourceRefsInSubquery3() throws SQLException {
+        String query = "SELECT * FROM A\n" +
+                "WHERE V1 > ANY (\n" +
+                "    SELECT K2 FROM B B1" +
+                "    WHERE V2 = (\n" +
+                "        SELECT max(V2) FROM B B2\n" +
+                "        WHERE B2.K2 = B1.K2 AND V2 < 'K'\n" +
+                "    )\n" +
+                ")";
+        verifyQueryPlanSourceRefs(query, 3);
+    }
+
+    @Test
+    public void testQueryPlanSourceRefsInSubquery4() throws SQLException {
+        String query = "SELECT * FROM (\n" +
+                "    SELECT K1, K2 FROM A\n" +
+                "    JOIN B ON K1 = K2\n" +
+                "    WHERE V1 = 'A' AND V2 = 'B'\n" +
+                "    LIMIT 10\n" +
+                ") ORDER BY K1";
+        verifyQueryPlanSourceRefs(query, 2);
+    }
+
+    @Test
+    public void testQueryPlanSourceRefsInSubquery5() throws SQLException {
+        String query = "SELECT * FROM (\n" +
+                "    SELECT KEY1, KEY2 FROM (\n" +
+                "        SELECT max(K1) KEY1, V1 FROM A GROUP BY V1\n" +
+                "    ) T1 JOIN (\n" +
+                "        SELECT max(K2) KEY2, V2 FROM B GROUP BY V2\n" +
+                "    ) T2 ON KEY1 = KEY2 LIMIT 10\n" +
+                ") ORDER BY KEY1";
+        verifyQueryPlanSourceRefs(query, 2);
+    }
+
+    @Test
+    public void testQueryPlanSourceRefsInUnion() throws SQLException {
+        String query = "SELECT K1, V1 FROM A WHERE V1 = 'A'\n" +
+                "UNION ALL\n" +
+                "SELECT K2, V2 FROM B WHERE V2 = 'B'";
+        verifyQueryPlanSourceRefs(query, 2);
+    }
+
+    private void verifyQueryPlanSourceRefs(String query, int refCount) throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE A (\n" +
+                    "    K1 VARCHAR(10) NOT NULL PRIMARY KEY,\n" +
+                    "    V1 VARCHAR(10))");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX1 ON A(V1)");
+            conn.createStatement().execute("CREATE TABLE B (\n" +
+                    "    K2 VARCHAR(10) NOT NULL PRIMARY KEY,\n" +
+                    "    V2 VARCHAR(10))");
+            conn.createStatement().execute("CREATE LOCAL INDEX IDX2 ON B(V2)");
+            PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+            QueryPlan plan = stmt.compileQuery(query);
+            Set<TableRef> sourceRefs = plan.getSourceRefs();
+            assertEquals(refCount, sourceRefs.size());
+            for (TableRef table : sourceRefs) {
+                assertTrue(table.getTable().getType() == PTableType.TABLE);
+            }
+            plan = stmt.optimizeQuery(query);
+            sourceRefs = plan.getSourceRefs();
+            assertEquals(refCount, sourceRefs.size());
+            for (TableRef table : sourceRefs) {
+                assertTrue(table.getTable().getType() == PTableType.INDEX);
+            }
+        }
+    }
+
     private static class MultipleChildrenExtractor implements QueryPlanVisitor<List<QueryPlan>> {
 
         @Override


[2/2] phoenix git commit: PHOENIX-4682 UngroupedAggregateRegionObserver preCompactScannerOpen hook should not throw exceptions

Posted by pb...@apache.org.
PHOENIX-4682 UngroupedAggregateRegionObserver preCompactScannerOpen hook should not throw exceptions


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ce3e5867
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ce3e5867
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ce3e5867

Branch: refs/heads/4.x-cdh5.11
Commit: ce3e5867eef9bf10038cc3729afdcfee1c27aa14
Parents: 98a8bbd
Author: Vincent Poon <vi...@apache.org>
Authored: Wed Apr 4 19:06:24 2018 +0100
Committer: Pedro Boado <pb...@apache.org>
Committed: Wed Apr 4 19:37:35 2018 +0100

----------------------------------------------------------------------
 .../phoenix/end2end/index/MutableIndexIT.java   | 49 ++++++++++--
 .../UngroupedAggregateRegionObserver.java       | 81 ++++++++++++--------
 2 files changed, 88 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce3e5867/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index efae15e..4b88b92 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -41,29 +41,26 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.*;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -867,6 +864,42 @@ public class MutableIndexIT extends ParallelStatsDisabledIT {
       }
   }
 
+  // some tables (e.g. indexes on views) have UngroupedAgg coproc loaded, but don't have a
+  // corresponding row in syscat.  This tests that compaction isn't blocked
+  @Test(timeout=120000)
+  public void testCompactNonPhoenixTable() throws Exception {
+      try (Connection conn = getConnection()) {
+          // create a vanilla HBase table (non-Phoenix)
+          String randomTable = generateUniqueName();
+          TableName hbaseTN = TableName.valueOf(randomTable);
+          byte[] famBytes = Bytes.toBytes("fam");
+          HTable hTable = getUtility().createTable(hbaseTN, famBytes);
+          TestUtil.addCoprocessor(conn, randomTable, UngroupedAggregateRegionObserver.class);
+          Put put = new Put(Bytes.toBytes("row"));
+          byte[] value = new byte[1];
+          Bytes.random(value);
+          put.add(famBytes, Bytes.toBytes("colQ"), value);
+          hTable.put(put);
+          hTable.flushCommits();
+
+          // major compaction shouldn't cause a timeout or RS abort
+          List<HRegion> regions = getUtility().getHBaseCluster().getRegions(hbaseTN);
+          HRegion hRegion = regions.get(0);
+          hRegion.flush(true);
+          HStore store = (HStore) hRegion.getStore(famBytes);
+          store.triggerMajorCompaction();
+          store.compactRecentForTestingAssumingDefaultPolicy(1);
+
+          // we should be able to compact syscat itself as well
+          regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+          hRegion = regions.get(0);
+          hRegion.flush(true);
+          store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+          store.triggerMajorCompaction();
+          store.compactRecentForTestingAssumingDefaultPolicy(1);
+      }
+  }
+
 private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException {
     PreparedStatement stmt = tenantConn.prepareStatement(dml);
       stmt.setString(1, "00000000000000" + String.valueOf(i));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce3e5867/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 72ca58d..965ba1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -99,6 +99,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
@@ -112,6 +113,7 @@ import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
@@ -962,35 +964,36 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     }
 
     @Override
-    public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
-            final InternalScanner scanner, final ScanType scanType) throws IOException {
-        // Compaction and split upcalls run with the effective user context of the requesting user.
-        // This will lead to failure of cross cluster RPC if the effective user is not
-        // the login user. Switch to the login user context to ensure we have the expected
-        // security context.
-        return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
-            @Override
-            public InternalScanner run() throws Exception {
-                TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
-                InternalScanner internalScanner = scanner;
-                if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+    public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
+            final Store store, final InternalScanner scanner, final ScanType scanType)
+                    throws IOException {
+        if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+            final TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
+            // Compaction and split upcalls run with the effective user context of the requesting user.
+            // This will lead to failure of cross cluster RPC if the effective user is not
+            // the login user. Switch to the login user context to ensure we have the expected
+            // security context.
+            return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
+                @Override public InternalScanner run() throws Exception {
+                    InternalScanner internalScanner = scanner;
                     try {
                         long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
                         StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
                             c.getEnvironment(), table.getNameAsString(), clientTimeStamp,
                             store.getFamily().getName());
                         internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner);
-                    } catch (IOException e) {
+                    } catch (Exception e) {
                         // If we can't reach the stats table, don't interrupt the normal
-                      // compaction operation, just log a warning.
-                      if (logger.isWarnEnabled()) {
-                          logger.warn("Unable to collect stats for " + table, e);
-                      }
+                        // compaction operation, just log a warning.
+                        if (logger.isWarnEnabled()) {
+                            logger.warn("Unable to collect stats for " + table, e);
+                        }
                     }
+                    return internalScanner;
                 }
-                return internalScanner;
-            }
-        });
+            });
+        }
+        return scanner;
     }
 
     private static PTable deserializeTable(byte[] b) {
@@ -1362,23 +1365,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // This will lead to failure of cross cluster RPC if the effective user is not
         // the login user. Switch to the login user context to ensure we have the expected
         // security context.
-        return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
-            @Override
-            public InternalScanner run() throws Exception {
-                // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index
-                if (request.isMajor()) {
-                    String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-                        try (PhoenixConnection conn =
-                                QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
-                        String baseTable = fullTableName;
-                        PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable);
+        final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+        // since we will make a call to syscat, do nothing if we are compacting syscat itself
+        if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
+            return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
+                @Override
+                public InternalScanner run() throws Exception {
+                    // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index
+                    try (PhoenixConnection conn =
+                            QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
+                        PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
                         List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes();
                         // FIXME need to handle views and indexes on views as well
                         for (PTable index : indexes) {
                             if (index.getIndexDisableTimestamp() != 0) {
                                 logger.info(
                                     "Modifying major compaction scanner to retain deleted cells for a table with disabled index: "
-                                            + baseTable);
+                                            + fullTableName);
                                 Scan scan = new Scan();
                                 scan.setMaxVersions();
                                 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
@@ -1386,10 +1389,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     HConstants.OLDEST_TIMESTAMP);
                             }
                         }
+                    } catch (Exception e) {
+                        if (e instanceof TableNotFoundException) {
+                            logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName);
+                            // non-Phoenix HBase tables won't be found, do nothing
+                        } else {
+                            logger.error("Unable to modify compaction scanner to retain deleted cells for a table with disabled Index; "
+                                    + fullTableName,
+                                    e);
+                        }
                     }
+                    return s;
                 }
-                return s;
-            }
-        });
+            });
+        }
+        return s;
     }
 }