You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/10/14 23:50:17 UTC

git commit: PHOENIX-943 Handle pushed down post-filters for subquery in joins with limit and non-groupby aggregation

Repository: phoenix
Updated Branches:
  refs/heads/master 34bfc63c9 -> 8ece1a74a


PHOENIX-943 Handle pushed down post-filters for subquery in joins with limit and non-groupby aggregation


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ece1a74
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ece1a74
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ece1a74

Branch: refs/heads/master
Commit: 8ece1a74afae2112b4959147b19fbf0e215b8de7
Parents: 34bfc63
Author: maryannxue <ma...@apache.org>
Authored: Tue Oct 14 17:49:53 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Tue Oct 14 17:49:53 2014 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  |  49 +++++++++
 .../apache/phoenix/compile/JoinCompiler.java    |  64 +++++++----
 .../apache/phoenix/compile/QueryCompiler.java   |  14 ++-
 .../phoenix/compile/StatementContext.java       |  12 +--
 .../phoenix/compile/SubselectRewriter.java      |  11 +-
 .../phoenix/execute/DelegateQueryPlan.java      | 105 ++++++++++++++++++
 .../apache/phoenix/execute/HashJoinPlan.java    | 106 +++----------------
 .../phoenix/execute/TupleProjectionPlan.java    | 100 +++++++++++++++++
 .../apache/phoenix/join/HashCacheClient.java    |   9 +-
 .../org/apache/phoenix/join/TupleProjector.java |   6 ++
 10 files changed, 334 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 8e35216..da8f447 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -41,6 +41,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
@@ -3379,6 +3380,36 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
     }
     
     @Test
+    public void testJoinWithSubqueryPostFilters() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " LIMIT 5) AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name != 'S1')";
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
     public void testJoinWithSubqueryAndAggregation() throws Exception {
         String query1 = "SELECT i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN (SELECT name, \"item_id\" iid FROM " 
             + JOIN_ITEM_TABLE_FULL_NAME + ") AS i ON o.\"item_id\" = i.iid GROUP BY i.name ORDER BY i.name";
@@ -3796,5 +3827,23 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
         }
     }
 
+    @Test
+    public void testUnsupportedJoinConditions() throws Exception {
+        String query = "SELECT * FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON (item.\"supplier_id\" || supp.\"supplier_id\") = ''";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            try {
+                statement.executeQuery();
+                fail("Should have got SQLFeatureNotSupportedException.");
+            } catch (SQLFeatureNotSupportedException e) {
+                assertEquals("Does not support non-standard or non-equi join conditions.", e.getMessage());
+            }
+        } finally {
+            conn.close();
+        }
+    }
+
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 7e5382e..ad1c6a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -313,6 +313,9 @@ public class JoinCompiler {
             for (ParseNode node : table.getPreFilters()) {
                 node.accept(prefilterRefVisitor);
             }
+            for (ParseNode node : table.getPostFilters()) {
+                node.accept(generalRefVisitor);
+            }
             for (ParseNode node : postFilters) {
                 node.accept(generalRefVisitor);
             }
@@ -331,22 +334,12 @@ public class JoinCompiler {
             }
         }
         
-        public Expression compilePostFilterExpression(StatementContext context) throws SQLException {
-            if (postFilters.isEmpty())
-                return null;
-            
-            ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
-            List<Expression> expressions = new ArrayList<Expression>(postFilters.size());
-            for (ParseNode postFilter : postFilters) {
-                expressionCompiler.reset();
-                Expression expression = postFilter.accept(expressionCompiler);
-                expressions.add(expression);
+        public Expression compilePostFilterExpression(StatementContext context, Table table) throws SQLException {
+            List<ParseNode> filtersCombined = Lists.<ParseNode> newArrayList(postFilters);
+            if (table != null) {
+                filtersCombined.addAll(table.getPostFilters());
             }
-            
-            if (expressions.size() == 1)
-                return expressions.get(0);
-            
-            return AndExpression.create(expressions);
+            return JoinCompiler.compilePostFilterExpression(context, filtersCombined);
         }
         
         /**
@@ -450,6 +443,9 @@ public class JoinCompiler {
             this.dependencies = new HashSet<TableRef>();
             OnNodeVisitor visitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable);
             onNode.accept(visitor);
+            if (onConditions.isEmpty()) {
+                visitor.throwUnsupportedJoinConditionException();
+            }
         }
         
         public JoinType getType() {
@@ -609,6 +605,7 @@ public class JoinCompiler {
         private final List<AliasedNode> selectNodes; // all basic nodes related to this table, no aggregation.
         private final List<ParseNode> preFilters;
         private final List<ParseNode> postFilters;
+        private final boolean isPostFilterConvertible;
         
         private Table(TableNode tableNode, List<ColumnDef> dynamicColumns, 
                 List<AliasedNode> selectNodes, TableRef tableRef) {
@@ -619,6 +616,7 @@ public class JoinCompiler {
             this.selectNodes = selectNodes;
             this.preFilters = new ArrayList<ParseNode>();
             this.postFilters = Collections.<ParseNode>emptyList();
+            this.isPostFilterConvertible = false;
         }
         
         private Table(DerivedTableNode tableNode, 
@@ -628,8 +626,9 @@ public class JoinCompiler {
             this.subselect = SubselectRewriter.flatten(tableNode.getSelect(), statement.getConnection());
             this.tableRef = tableRef;
             this.selectNodes = selectNodes;
-            this.preFilters = Collections.<ParseNode>emptyList();
+            this.preFilters = new ArrayList<ParseNode>();
             this.postFilters = new ArrayList<ParseNode>();
+            this.isPostFilterConvertible = SubselectRewriter.isPostFilterConvertible(subselect);
         }
         
         public TableNode getTableNode() {
@@ -661,21 +660,24 @@ public class JoinCompiler {
         }
         
         public void addFilter(ParseNode filter) {
-            if (!isSubselect()) {
+            if (!isSubselect() || isPostFilterConvertible) {
                 preFilters.add(filter);
-                return;
+            } else {
+                postFilters.add(filter);
             }
-            
-            postFilters.add(filter);
         }
         
         public ParseNode getPreFiltersCombined() {
             return combine(preFilters);
         }
         
+        public Expression compilePostFilterExpression(StatementContext context) throws SQLException {
+            return JoinCompiler.compilePostFilterExpression(context, postFilters);
+        }
+        
         public SelectStatement getAsSubquery() throws SQLException {
             if (isSubselect())
-                return SubselectRewriter.applyPostFilters(subselect, postFilters, tableNode.getAlias());
+                return SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias());
             
             List<TableNode> from = Collections.<TableNode>singletonList(tableNode);
             return NODE_FACTORY.select(from, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, null, null, 0, false, select.hasSequence());
@@ -970,7 +972,7 @@ public class JoinCompiler {
          * 2) a boolean condition referencing to the self table only.
          * Otherwise, it can be ambiguous.
          */
-        private void throwUnsupportedJoinConditionException() 
+        public void throwUnsupportedJoinConditionException() 
                 throws SQLFeatureNotSupportedException {
             throw new SQLFeatureNotSupportedException("Does not support non-standard or non-equi join conditions.");
         }           
@@ -1093,6 +1095,24 @@ public class JoinCompiler {
         return ret;
     }
     
+    private static Expression compilePostFilterExpression(StatementContext context, List<ParseNode> postFilters) throws SQLException {
+        if (postFilters.isEmpty())
+            return null;
+        
+        ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+        List<Expression> expressions = new ArrayList<Expression>(postFilters.size());
+        for (ParseNode postFilter : postFilters) {
+            expressionCompiler.reset();
+            Expression expression = postFilter.accept(expressionCompiler);
+            expressions.add(expression);
+        }
+        
+        if (expressions.size() == 1)
+            return expressions.get(0);
+        
+        return AndExpression.create(expressions);
+    }
+    
     public static SelectStatement optimize(PhoenixStatement statement, SelectStatement select, final ColumnResolver resolver) throws SQLException {
         TableRef groupByTableRef = null;
         TableRef orderByTableRef = null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index a2dc5b3..d82ac02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
 import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan;
 import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -157,8 +158,7 @@ public class QueryCompiler {
             QueryPlan plan = compileSubquery(subquery);
             ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector());
             context.setResolver(projectedTable.createColumnResolver());
-            context.setClientTupleProjector(projectedTable.createTupleProjector());
-            return plan;
+            return new TupleProjectionPlan(plan, projectedTable.createTupleProjector(), table.compilePostFilterExpression(context));
         }
         
         boolean[] starJoinVector = joinTable.getStarJoinVector();
@@ -197,7 +197,6 @@ public class QueryCompiler {
                 StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
                 QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true);
                 ColumnResolver resolver = subContext.getResolver();
-                TupleProjector clientProjector = subContext.getClientTupleProjector();
                 boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
                 if (hasPostReference) {
                     PTableWrapper subProjTable = ((JoinedTableColumnResolver) (resolver)).getPTableWrapper();
@@ -225,7 +224,7 @@ public class QueryCompiler {
                 if (i < count - 1) {
                     fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
                 }
-                subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression, clientProjector, hasFilters);
+                subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression, hasFilters);
             }
             if (needsProject) {
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector());
@@ -233,7 +232,7 @@ public class QueryCompiler {
             context.setCurrentTable(tableRef);
             context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
             QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, joinTable.isAllLeftJoin());
-            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
+            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
             Integer limit = null;
             if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
                 limit = LimitCompiler.compile(context, query);
@@ -258,7 +257,6 @@ public class QueryCompiler {
             StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
             QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true);
             ColumnResolver lhsResolver = lhsCtx.getResolver();
-            TupleProjector clientProjector = lhsCtx.getClientTupleProjector();
             PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) (lhsResolver)).getPTableWrapper();
             ProjectedPTableWrapper rhsProjTable;
             TableRef rhsTableRef;
@@ -288,7 +286,7 @@ public class QueryCompiler {
             context.setCurrentTable(rhsTableRef);
             context.setResolver(projectedTable.createColumnResolver());
             QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, type == JoinType.Right);
-            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
+            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
             Integer limit = null;
             if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
                 limit = LimitCompiler.compile(context, rhs);
@@ -296,7 +294,7 @@ public class QueryCompiler {
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
             Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
             getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions);
-            return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), clientProjector, lhsJoin.hasFilters())});
+            return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), lhsJoin.hasFilters())});
         }
         
         // Do not support queries like "A right join B left join C" with hash-joins.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index b0ba6f0..47ce3c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -73,7 +72,6 @@ public class StatementContext {
 
     private TableRef currentTable;
     private List<Pair<byte[], byte[]>> whereConditionColumns;
-    private TupleProjector clientTupleProjector;
     private TimeRange scanTimeRange = null;
     
     private Map<SelectStatement, Object> subqueryResults;
@@ -85,7 +83,7 @@ public class StatementContext {
     public StatementContext(PhoenixStatement statement, Scan scan) {
         this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement));
     }
-    
+
     public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) {
         this.statement = statement;
         this.resolver = resolver;
@@ -237,14 +235,6 @@ public class StatementContext {
     public List<Pair<byte[], byte[]>> getWhereCoditionColumns() {
         return whereConditionColumns;
     }
-    
-    public TupleProjector getClientTupleProjector() {
-        return clientTupleProjector;
-    }
-    
-    public void setClientTupleProjector(TupleProjector projector) {
-        this.clientTupleProjector = projector;
-    }
 
     public void setScanTimeRange(TimeRange value){
     	this.scanTimeRange = value;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index 3edcbc2..35ea900 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -19,7 +19,6 @@
 package org.apache.phoenix.compile;
 
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -50,15 +49,15 @@ public class SubselectRewriter extends ParseNodeRewriter {
         if (postFilters.isEmpty())
             return statement;
         
-        // TODO Handle post-filters in the below two cases from JoinCompiler:
-        // 1) select ... from A join (select id, b from T limit 10) as B on A.id = B.id where B.b = 'b'
-        // 2) select ... from A join (select count(*) c from T) as B on A.a = B.c where B.c > 10
-        if (statement.getLimit() != null || (statement.isAggregate() && statement.getGroupBy().isEmpty()))
-            throw new SQLFeatureNotSupportedException();
+        assert(isPostFilterConvertible(statement));
         
         return new SubselectRewriter(null, statement.getSelect(), subqueryAlias).applyPostFilters(statement, postFilters);
     }
     
+    public static boolean isPostFilterConvertible(SelectStatement statement) throws SQLException {
+        return statement.getLimit() == null && (!statement.isAggregate() || !statement.getGroupBy().isEmpty());        
+    }
+    
     public static SelectStatement flatten(SelectStatement select, PhoenixConnection connection) throws SQLException {
         List<TableNode> from = select.getFrom();
         while (from.size() == 1 && from.get(0) instanceof DerivedTableNode) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
new file mode 100644
index 0000000..4d50ba0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.ParameterMetaData;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.TableRef;
+
+public abstract class DelegateQueryPlan implements QueryPlan {
+    protected final QueryPlan delegate;
+
+    public DelegateQueryPlan(QueryPlan delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public StatementContext getContext() {
+        return delegate.getContext();
+    }
+
+    @Override
+    public ParameterMetaData getParameterMetaData() {
+        return delegate.getParameterMetaData();
+    }
+
+    @Override
+    public long getEstimatedSize() {
+        return delegate.getEstimatedSize();
+    }
+
+    @Override
+    public TableRef getTableRef() {
+        return delegate.getTableRef();
+    }
+
+    @Override
+    public RowProjector getProjector() {
+        return delegate.getProjector();
+    }
+
+    @Override
+    public Integer getLimit() {
+        return delegate.getLimit();
+    }
+
+    @Override
+    public OrderBy getOrderBy() {
+        return delegate.getOrderBy();
+    }
+
+    @Override
+    public GroupBy getGroupBy() {
+        return delegate.getGroupBy();
+    }
+
+    @Override
+    public List<KeyRange> getSplits() {
+        return delegate.getSplits();
+    }
+
+    @Override
+    public List<List<Scan>> getScans() {
+        return delegate.getScans();
+    }
+
+    @Override
+    public FilterableStatement getStatement() {
+        return delegate.getStatement();
+    }
+
+    @Override
+    public boolean isDegenerate() {
+        return delegate.isDegenerate();
+    }
+
+    @Override
+    public boolean isRowKeyOrdered() {
+        return delegate.isRowKeyOrdered();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index bb3940c..fce4245 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.execute;
 
 import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
 
-import java.sql.ParameterMetaData;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
@@ -38,8 +37,6 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.FromCompiler;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
@@ -59,31 +56,27 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PArrayDataType;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 
 import com.google.common.collect.Lists;
 
-public class HashJoinPlan implements QueryPlan {
+public class HashJoinPlan extends DelegateQueryPlan {
     private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
 
     private final FilterableStatement statement;
-    private final BaseQueryPlan plan;
     private final HashJoinInfo joinInfo;
     private final SubPlan[] subPlans;
     private final boolean recompileWhereClause;
@@ -98,7 +91,7 @@ public class HashJoinPlan implements QueryPlan {
     public static HashJoinPlan create(FilterableStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
         if (plan instanceof BaseQueryPlan)
-            return new HashJoinPlan(statement, (BaseQueryPlan) plan, joinInfo, subPlans, joinInfo == null);
+            return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null);
         
         assert (plan instanceof HashJoinPlan);
         HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
@@ -111,13 +104,13 @@ public class HashJoinPlan implements QueryPlan {
         for (SubPlan subPlan : subPlans) {
             mergedSubPlans[i++] = subPlan;
         }
-        return new HashJoinPlan(statement, hashJoinPlan.plan, joinInfo, mergedSubPlans, true);
+        return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans, true);
     }
     
     private HashJoinPlan(FilterableStatement statement, 
-            BaseQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
+            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
+        super(plan);
         this.statement = statement;
-        this.plan = plan;
         this.joinInfo = joinInfo;
         this.subPlans = subPlans;
         this.recompileWhereClause = recompileWhereClause;
@@ -126,21 +119,6 @@ public class HashJoinPlan implements QueryPlan {
     }
 
     @Override
-    public Integer getLimit() {
-        return plan.getLimit();
-    }
-
-    @Override
-    public OrderBy getOrderBy() {
-        return plan.getOrderBy();
-    }
-
-    @Override
-    public RowProjector getProjector() {
-        return plan.getProjector();
-    }
-
-    @Override
     public ResultIterator iterator() throws SQLException {
         int count = subPlans.length;
         PhoenixConnection connection = getContext().getConnection();
@@ -149,7 +127,7 @@ public class HashJoinPlan implements QueryPlan {
         List<Future<Object>> futures = Lists.<Future<Object>>newArrayListWithExpectedSize(count);
         dependencies = Lists.newArrayList();
         if (joinInfo != null) {
-            hashClient = new HashCacheClient(plan.getContext().getConnection());
+            hashClient = new HashCacheClient(delegate.getContext().getConnection());
             maxServerCacheTimeToLive = services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
             firstJobEndTime = new AtomicLong(0);
             keyRangeExpressions = new CopyOnWriteArrayList<Expression>();
@@ -194,24 +172,24 @@ public class HashJoinPlan implements QueryPlan {
         
         boolean hasKeyRangeExpressions = keyRangeExpressions != null && !keyRangeExpressions.isEmpty();
         if (recompileWhereClause || hasKeyRangeExpressions) {
-            StatementContext context = plan.getContext();
+            StatementContext context = delegate.getContext();
             PTable table = context.getCurrentTable().getTable();
             ParseNode viewWhere = table.getViewStatement() == null ? null : new SQLParser(table.getViewStatement()).parseQuery().getWhere();
-            context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (plan.getStatement()), plan.getContext().getConnection()));
+            context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (delegate.getStatement()), delegate.getContext().getConnection()));
             if (recompileWhereClause) {
-                WhereCompiler.compile(plan.getContext(), plan.getStatement(), viewWhere);                
+                WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere);                
             }
             if (hasKeyRangeExpressions) {
-                WhereCompiler.compile(plan.getContext(), plan.getStatement(), viewWhere, keyRangeExpressions, true);
+                WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, true);
             }
         }
 
         if (joinInfo != null) {
-            Scan scan = plan.getContext().getScan();
+            Scan scan = delegate.getContext().getScan();
             HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
         }
         
-        return plan.iterator(dependencies);
+        return ((BaseQueryPlan) delegate).iterator(dependencies);
     }
 
     private Expression createKeyRangeExpression(Expression lhsExpression,
@@ -254,18 +232,8 @@ public class HashJoinPlan implements QueryPlan {
     }
 
     @Override
-    public long getEstimatedSize() {
-        return plan.getEstimatedSize();
-    }
-
-    @Override
-    public List<KeyRange> getSplits() {
-        return plan.getSplits();
-    }
-
-    @Override
     public ExplainPlan getExplainPlan() throws SQLException {
-        List<String> planSteps = Lists.newArrayList(plan.getExplainPlan().getPlanSteps());
+        List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
         int count = subPlans.length;
         for (int i = 0; i < count; i++) {
             planSteps.addAll(subPlans[i].getPreSteps(this));
@@ -285,35 +253,10 @@ public class HashJoinPlan implements QueryPlan {
     }
 
     @Override
-    public ParameterMetaData getParameterMetaData() {
-        return plan.getParameterMetaData();
-    }
-
-    @Override
-    public StatementContext getContext() {
-        return plan.getContext();
-    }
-
-    @Override
-    public GroupBy getGroupBy() {
-        return plan.getGroupBy();
-    }
-
-    @Override
-    public TableRef getTableRef() {
-        return plan.getTableRef();
-    }
-
-    @Override
     public FilterableStatement getStatement() {
         return statement;
     }
 
-    @Override
-    public boolean isDegenerate() {
-        return false;
-    }
-
     protected interface SubPlan {
         public Object execute(HashJoinPlan parent) throws SQLException;
         public void postProcess(Object result, HashJoinPlan parent) throws SQLException;
@@ -398,7 +341,6 @@ public class HashJoinPlan implements QueryPlan {
         private final boolean singleValueOnly;
         private final Expression keyRangeLhsExpression;
         private final Expression keyRangeRhsExpression;
-        private final TupleProjector clientProjector;
         private final boolean hasFilters;
         
         public HashSubPlan(int index, QueryPlan subPlan, 
@@ -406,20 +348,19 @@ public class HashJoinPlan implements QueryPlan {
                 boolean singleValueOnly,
                 Expression keyRangeLhsExpression, 
                 Expression keyRangeRhsExpression, 
-                TupleProjector clientProjector, boolean hasFilters) {
+                boolean hasFilters) {
             this.index = index;
             this.plan = subPlan;
             this.hashExpressions = hashExpressions;
             this.singleValueOnly = singleValueOnly;
             this.keyRangeLhsExpression = keyRangeLhsExpression;
             this.keyRangeRhsExpression = keyRangeRhsExpression;
-            this.clientProjector = clientProjector;
             this.hasFilters = hasFilters;
         }
 
         @Override
         public Object execute(HashJoinPlan parent) throws SQLException {
-            ScanRanges ranges = parent.plan.getContext().getScanRanges();
+            ScanRanges ranges = parent.delegate.getContext().getScanRanges();
             List<ImmutableBytesWritable> keyRangeRhsValues = null;
             if (keyRangeRhsExpression != null) {
                 keyRangeRhsValues = Lists.<ImmutableBytesWritable>newArrayList();
@@ -427,19 +368,16 @@ public class HashJoinPlan implements QueryPlan {
             ServerCache cache = null;
             if (hashExpressions != null) {
                 cache = parent.hashClient.addHashCache(ranges, plan.iterator(), 
-                        clientProjector, plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
+                        plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.delegate.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
                 long endTime = System.currentTimeMillis();
                 boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
                 if (!isSet && (endTime - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {
-                    LOG.warn(addCustomAnnotations("Hash plan [" + index + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", parent.plan.context.getConnection()));
+                    LOG.warn(addCustomAnnotations("Hash plan [" + index + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", parent.delegate.getContext().getConnection()));
                 }
             } else {
                 assert(keyRangeRhsExpression != null);
                 ResultIterator iterator = plan.iterator();
                 for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
-                    if (clientProjector != null) {
-                        result = clientProjector.projectResults(result);
-                    }
                     // Evaluate key expressions for hash join key range optimization.
                     ImmutableBytesWritable value = new ImmutableBytesWritable();
                     keyRangeRhsExpression.reset();
@@ -494,16 +432,6 @@ public class HashJoinPlan implements QueryPlan {
         }
         
     }
-
-    @Override
-    public boolean isRowKeyOrdered() {
-        return plan.isRowKeyOrdered();
-    }
-
-    @Override
-    public List<List<Scan>> getScans() {
-        return plan.getScans();
-    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
new file mode 100644
index 0000000..410d386
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DelegateResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.join.TupleProjector;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+import com.google.common.collect.Lists;
+
+public class TupleProjectionPlan extends DelegateQueryPlan {
+    private final TupleProjector tupleProjector;
+    private final Expression postFilter;
+
+    public TupleProjectionPlan(QueryPlan plan, TupleProjector tupleProjector, Expression postFilter) {
+        super(plan);
+        if (tupleProjector == null) throw new IllegalArgumentException("tupleProjector is null");
+        this.tupleProjector = tupleProjector;
+        this.postFilter = postFilter;
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
+        if (postFilter != null) {
+            planSteps.add("CLIENT FILTER BY " + postFilter.toString());
+        }
+        
+        return new ExplainPlan(planSteps);
+    }
+
+    @Override
+    public ResultIterator iterator() throws SQLException {
+        final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+
+        return new DelegateResultIterator(delegate.iterator()) {
+            
+            @Override
+            public Tuple next() throws SQLException {
+                Tuple tuple = null;
+                while (tuple == null) {
+                    tuple = super.next();
+                    if (tuple == null) {
+                        break;
+                    }
+                    
+                    tuple = tupleProjector.projectResults(tuple);
+                    
+                    if (postFilter != null) {
+                        postFilter.reset();
+                        try {
+                            if (postFilter.evaluate(tuple, tempPtr)) {
+                                Boolean b = (Boolean)postFilter.getDataType().toObject(tempPtr);
+                                if (!b.booleanValue()) {
+                                    tuple = null;
+                                }            
+                            } else {
+                                tuple = null;
+                            }
+                        } catch (IllegalDataException e) {
+                            tuple = null;
+                        }
+                    }
+                }
+                
+                return tuple;
+            }
+
+            @Override
+            public String toString() {
+                return "TupleProjectionResultIterator [projector=" + tupleProjector + ", postFilter="
+                        + postFilter + "]";
+            }            
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index b6245ac..5ea11b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -69,16 +69,16 @@ public class HashCacheClient  {
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
+    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        serialize(ptr, iterator, projector, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
+        serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
         return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
     }
     
-    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
+    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
         long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
         estimatedSize = Math.min(estimatedSize, maxSize);
         if (estimatedSize > Integer.MAX_VALUE) {
@@ -99,9 +99,6 @@ public class HashCacheClient  {
             int nRows = 0;
             out.writeInt(nRows); // In the end will be replaced with total number of rows            
             for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
-                if (projector != null) {
-                    result = projector.projectResults(result);
-                }
                 TupleUtil.write(result, out);
                 if (baOut.size() > maxSize) {
                     throw new MaxServerCacheSizeExceededException("Size of hash cache (" + baOut.size() + " bytes) exceeds the maximum allowed size (" + maxSize + " bytes)");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece1a74/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
index 41b3906..e0d9336 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
@@ -250,5 +251,10 @@ public class TupleProjector {
     public ValueBitSet getValueBitSet() {
         return valueSet;
     }
+    
+    @Override
+    public String toString() {
+        return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}";
+    }
 }