You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/07/15 07:18:08 UTC

[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5389 Push down PostFilter to Sub-JoinTable for SortMergeJoin and NoStarJoin

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
     new 6bc1f51  PHOENIX-5389 Push down PostFilter to Sub-JoinTable for SortMergeJoin and NoStarJoin
6bc1f51 is described below

commit 6bc1f514f267cfbfcfebd68dcbbef6c85c58013c
Author: chenglei <ch...@apache.org>
AuthorDate: Mon Jul 15 15:17:16 2019 +0800

    PHOENIX-5389 Push down PostFilter to Sub-JoinTable for SortMergeJoin and NoStarJoin
---
 .../org/apache/phoenix/compile/JoinCompiler.java   | 365 +++++++++++++--------
 .../org/apache/phoenix/compile/QueryCompiler.java  |  24 +-
 .../apache/phoenix/compile/SubqueryRewriter.java   |  53 +--
 .../apache/phoenix/optimize/QueryOptimizer.java    |  55 +---
 .../phoenix/parse/AndBooleanParseNodeVisitor.java  |  41 +++
 .../parse/AndRewriterBooleanParseNodeVisitor.java  |  72 ++++
 .../phoenix/compile/JoinQueryCompilerTest.java     |  48 +--
 .../apache/phoenix/compile/QueryCompilerTest.java  | 207 ++++++++++++
 .../java/org/apache/phoenix/util/TestUtil.java     |   8 +-
 9 files changed, 610 insertions(+), 263 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 3803201..f932a32 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -23,6 +23,7 @@ import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCOD
 
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -34,6 +35,7 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
+
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -46,7 +48,9 @@ import org.apache.phoenix.expression.function.MinAggregateFunction;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.AndBooleanParseNodeVisitor;
 import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.AndRewriterBooleanParseNodeVisitor;
 import org.apache.phoenix.parse.BindTableNode;
 import org.apache.phoenix.parse.BooleanParseNodeVisitor;
 import org.apache.phoenix.parse.ColumnDef;
@@ -140,7 +144,7 @@ public class JoinCompiler {
         Pair<Table, List<JoinSpec>> res = select.getFrom().accept(constructor);
         JoinTable joinTable = res.getSecond() == null ? compiler.new JoinTable(res.getFirst()) : compiler.new JoinTable(res.getFirst(), res.getSecond());
         if (select.getWhere() != null) {
-            joinTable.addFilter(select.getWhere());
+            joinTable.pushDownFilter(select.getWhere());
         }
 
         ColumnRefParseNodeVisitor generalRefVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection());
@@ -231,47 +235,47 @@ public class JoinCompiler {
     }
 
     public class JoinTable {
-        private final Table table;
+        private final Table leftTable;
         private final List<JoinSpec> joinSpecs;
-        private final List<ParseNode> postFilters;
-        private final List<Table> tables;
-        private final List<TableRef> tableRefs;
+        private List<ParseNode> postFilters;
+        private final List<Table> allTables;
+        private final List<TableRef> allTableRefs;
         private final boolean allLeftJoin;
         private final boolean isPrefilterAccepted;
         private final List<JoinSpec> prefilterAcceptedTables;
 
         private JoinTable(Table table) {
-            this.table = table;
+            this.leftTable = table;
             this.joinSpecs = Collections.<JoinSpec>emptyList();
-            this.postFilters = Collections.<ParseNode>emptyList();
-            this.tables = Collections.<Table>singletonList(table);
-            this.tableRefs = Collections.<TableRef>singletonList(table.getTableRef());
+            this.postFilters = Collections.EMPTY_LIST;
+            this.allTables = Collections.<Table>singletonList(table);
+            this.allTableRefs = Collections.<TableRef>singletonList(table.getTableRef());
             this.allLeftJoin = false;
             this.isPrefilterAccepted = true;
             this.prefilterAcceptedTables = Collections.<JoinSpec>emptyList();
         }
 
         private JoinTable(Table table, List<JoinSpec> joinSpecs) {
-            this.table = table;
+            this.leftTable = table;
             this.joinSpecs = joinSpecs;
             this.postFilters = new ArrayList<ParseNode>();
-            this.tables = new ArrayList<Table>();
-            this.tableRefs = new ArrayList<TableRef>();
-            this.tables.add(table);
+            this.allTables = new ArrayList<Table>();
+            this.allTableRefs = new ArrayList<TableRef>();
+            this.allTables.add(table);
             boolean allLeftJoin = true;
             int lastRightJoinIndex = -1;
             boolean hasFullJoin = false;
             for (int i = 0; i < joinSpecs.size(); i++) {
                 JoinSpec joinSpec = joinSpecs.get(i);
-                this.tables.addAll(joinSpec.getJoinTable().getTables());
+                this.allTables.addAll(joinSpec.getRhsJoinTable().getAllTables());
                 allLeftJoin = allLeftJoin && joinSpec.getType() == JoinType.Left;
                 hasFullJoin = hasFullJoin || joinSpec.getType() == JoinType.Full;
                 if (joinSpec.getType() == JoinType.Right) {
                     lastRightJoinIndex = i;
                 }
             }
-            for (Table t : this.tables) {
-                this.tableRefs.add(t.getTableRef());
+            for (Table t : this.allTables) {
+                this.allTableRefs.add(t.getTableRef());
             }
             this.allLeftJoin = allLeftJoin;
             this.isPrefilterAccepted = !hasFullJoin && lastRightJoinIndex == -1;
@@ -284,20 +288,24 @@ public class JoinCompiler {
             }
         }
 
-        public Table getTable() {
-            return table;
+        public Table getLeftTable() {
+            return leftTable;
         }
 
         public List<JoinSpec> getJoinSpecs() {
             return joinSpecs;
         }
 
-        public List<Table> getTables() {
-            return tables;
+        public List<Table> getAllTables() {
+            return allTables;
+        }
+
+        public List<TableRef> getAllTableRefs() {
+            return allTableRefs;
         }
 
-        public List<TableRef> getTableRefs() {
-            return tableRefs;
+        public List<TableRef> getLeftTableRef() {
+            return Collections.<TableRef>singletonList(leftTable.getTableRef());
         }
 
         public boolean isAllLeftJoin() {
@@ -320,32 +328,57 @@ public class JoinCompiler {
             return combine(postFilters);
         }
 
-        public void addFilter(ParseNode filter) throws SQLException {
+        public void addPostJoinFilter(ParseNode parseNode) {
+            if(this.postFilters == Collections.EMPTY_LIST) {
+                this.postFilters = new ArrayList<ParseNode>();
+            }
+            this.postFilters.add(parseNode);
+        }
+
+        public void addLeftTableFilter(ParseNode parseNode) {
+            if (isPrefilterAccepted) {
+                leftTable.addFilter(parseNode);
+            } else {
+                addPostJoinFilter(parseNode);
+            }
+        }
+
+        public List<JoinSpec> getPrefilterAcceptedJoinSpecs() {
+            return this.prefilterAcceptedTables;
+        }
+
+        /**
+         * try to decompose filter and push down to single table.
+         * @param filter
+         * @throws SQLException
+         */
+        public void pushDownFilter(ParseNode filter) throws SQLException {
             if (joinSpecs.isEmpty()) {
-                table.addFilter(filter);
+                leftTable.addFilter(filter);
                 return;
             }
 
-            WhereNodeVisitor visitor = new WhereNodeVisitor(origResolver, table,
-                    postFilters, Collections.<TableRef>singletonList(table.getTableRef()),
-                    isPrefilterAccepted, prefilterAcceptedTables, statement.getConnection());
+            WhereNodeVisitor visitor = new WhereNodeVisitor(
+                    origResolver,
+                    this,
+                    statement.getConnection());
             filter.accept(visitor);
         }
 
         public void pushDownColumnRefVisitors(ColumnRefParseNodeVisitor generalRefVisitor,
                 ColumnRefParseNodeVisitor joinLocalRefVisitor,
                 ColumnRefParseNodeVisitor prefilterRefVisitor) throws SQLException {
-            for (ParseNode node : table.getPreFilters()) {
+            for (ParseNode node : leftTable.getPreFilters()) {
                 node.accept(prefilterRefVisitor);
             }
-            for (ParseNode node : table.getPostFilters()) {
+            for (ParseNode node : leftTable.getPostFilters()) {
                 node.accept(generalRefVisitor);
             }
             for (ParseNode node : postFilters) {
                 node.accept(generalRefVisitor);
             }
             for (JoinSpec joinSpec : joinSpecs) {
-                JoinTable joinTable = joinSpec.getJoinTable();
+                JoinTable joinTable = joinSpec.getRhsJoinTable();
                 boolean hasSubJoin = !joinTable.getJoinSpecs().isEmpty();
                 for (EqualParseNode node : joinSpec.getOnConditions()) {
                     node.getLHS().accept(generalRefVisitor);
@@ -390,8 +423,8 @@ public class JoinCompiler {
                 JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
                 JoinType type = lastJoinSpec.getType();
                 if ((type == JoinType.Right || type == JoinType.Inner)
-                        && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
-                        && lastJoinSpec.getJoinTable().getTable().isFlat()) {
+                        && lastJoinSpec.getRhsJoinTable().getJoinSpecs().isEmpty()
+                        && lastJoinSpec.getRhsJoinTable().getLeftTable().isFlat()) {
                     strategies.add(Strategy.HASH_BUILD_LEFT);
                 }
                 strategies.add(Strategy.SORT_MERGE);
@@ -408,7 +441,7 @@ public class JoinCompiler {
          */
         public boolean[] getStarJoinVector() {
             int count = joinSpecs.size();
-            if (!table.isFlat() ||
+            if (!leftTable.isFlat() ||
                     (!useStarJoin
                             && count > 1
                             && joinSpecs.get(count - 1).getType() != JoinType.Left
@@ -426,10 +459,10 @@ public class JoinCompiler {
                         && joinSpec.getType() != JoinType.Anti)
                     return null;
                 vector[i] = true;
-                Iterator<TableRef> iter = joinSpec.getDependencies().iterator();
+                Iterator<TableRef> iter = joinSpec.getDependentTableRefs().iterator();
                 while (vector[i] == true && iter.hasNext()) {
                     TableRef tableRef = iter.next();
-                    if (!tableRef.equals(table.getTableRef())) {
+                    if (!tableRef.equals(leftTable.getTableRef())) {
                         vector[i] = false;
                     }
                 }
@@ -438,9 +471,52 @@ public class JoinCompiler {
             return vector;
         }
 
-        public JoinTable getSubJoinTableWithoutPostFilters() {
-            return joinSpecs.size() > 1 ? new JoinTable(table, joinSpecs.subList(0, joinSpecs.size() - 1)) :
-                new JoinTable(table);
+        /**
+         * create a new {@link JoinTable} exclude the last {@link JoinSpec},
+         * and try to push {@link #postFilters} to the new {@link JoinTable}.
+         * @param phoenixConnection
+         * @return
+         * @throws SQLException
+         */
+        public JoinTable createSubJoinTable(
+                PhoenixConnection phoenixConnection) throws SQLException {
+            assert joinSpecs.size() > 0;
+            JoinTable newJoinTablesContext = joinSpecs.size() > 1 ?
+                    new JoinTable(leftTable, joinSpecs.subList(0, joinSpecs.size() - 1)) :
+                    new JoinTable(leftTable);
+            JoinType rightmostJoinType = joinSpecs.get(joinSpecs.size() - 1).getType();
+            if(rightmostJoinType == JoinType.Right || rightmostJoinType == JoinType.Full) {
+                return newJoinTablesContext;
+            }
+
+            if(this.postFilters.isEmpty()) {
+                return newJoinTablesContext;
+            }
+
+            PushDownPostFilterParseNodeVisitor pushDownPostFilterNodeVistor =
+                    new PushDownPostFilterParseNodeVisitor(
+                            JoinCompiler.this.origResolver,
+                            newJoinTablesContext,
+                            phoenixConnection);
+            int index = 0;
+            List<ParseNode> newPostFilterParseNodes = null;
+            for(ParseNode postFilterParseNode : this.postFilters) {
+                ParseNode newPostFilterParseNode =
+                        postFilterParseNode.accept(pushDownPostFilterNodeVistor);
+                if(newPostFilterParseNode != postFilterParseNode &&
+                   newPostFilterParseNodes == null) {
+                    newPostFilterParseNodes =
+                            new ArrayList<ParseNode>(this.postFilters.subList(0, index));
+                }
+                if(newPostFilterParseNodes != null && newPostFilterParseNode != null) {
+                    newPostFilterParseNodes.add(newPostFilterParseNode);
+                }
+                index++;
+            }
+            if(newPostFilterParseNodes != null) {
+                this.postFilters = newPostFilterParseNodes;
+            }
+            return newJoinTablesContext;
         }
 
         public SelectStatement getAsSingleSubquery(SelectStatement query, boolean asSubquery) throws SQLException {
@@ -453,14 +529,15 @@ public class JoinCompiler {
         }
 
         public boolean hasPostReference() {
-            for (Table table : tables) {
+            for (Table table : allTables) {
                 if (table.isWildCardSelect()) {
                     return true;
                 }
             }
 
             for (Map.Entry<ColumnRef, ColumnRefType> e : columnRefs.entrySet()) {
-                if (e.getValue() == ColumnRefType.GENERAL && tableRefs.contains(e.getKey().getTableRef())) {
+                if (e.getValue() == ColumnRefType.GENERAL &&
+                    allTableRefs.contains(e.getKey().getTableRef())) {
                     return true;
                 }
             }
@@ -472,11 +549,11 @@ public class JoinCompiler {
            if (!postFilters.isEmpty())
                return true;
 
-           if (isPrefilterAccepted && table.hasFilters())
+           if (isPrefilterAccepted && leftTable.hasFilters())
                return true;
 
            for (JoinSpec joinSpec : prefilterAcceptedTables) {
-               if (joinSpec.getJoinTable().hasFilters())
+               if (joinSpec.getRhsJoinTable().hasFilters())
                    return true;
            }
 
@@ -487,25 +564,38 @@ public class JoinCompiler {
     public class JoinSpec {
         private final JoinType type;
         private final List<EqualParseNode> onConditions;
-        private final JoinTable joinTable;
+        private final JoinTable rhsJoinTable;
         private final boolean singleValueOnly;
-        private Set<TableRef> dependencies;
+        private Set<TableRef> dependentTableRefs;
         private OnNodeVisitor onNodeVisitor;
 
         private JoinSpec(JoinType type, ParseNode onNode, JoinTable joinTable,
                 boolean singleValueOnly, ColumnResolver resolver) throws SQLException {
             this.type = type;
             this.onConditions = new ArrayList<EqualParseNode>();
-            this.joinTable = joinTable;
+            this.rhsJoinTable = joinTable;
             this.singleValueOnly = singleValueOnly;
-            this.dependencies = new HashSet<TableRef>();
-            this.onNodeVisitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable, statement.getConnection());
+            this.dependentTableRefs = new HashSet<TableRef>();
+            this.onNodeVisitor = new OnNodeVisitor(resolver, this, statement.getConnection());
             if (onNode != null) {
-                onNode.accept(this.onNodeVisitor);
+                this.pushDownOnCondition(onNode);
             }
         }
 
-        public void addOnCondition(ParseNode node) throws SQLException {
+        /**
+         * <pre>
+         * 1.in {@link JoinSpec} ctor,try to push the filter in join on clause to where clause,
+         *   eg. for "a join b on a.id = b.id and b.code = 1 where a.name is not null", try to
+         *   push "b.code =1" in join on clause to where clause.
+         * 2.in{@link WhereNodeVisitor#visitLeave(ComparisonParseNode, List)}, for inner join,
+         *   try to push the join on condition in where clause to join on clause,
+         *   eg. for "a join b on a.id = b.id where a.name = b.name", try to push "a.name=b.name"
+         *   in where clause to join on clause.
+         * </pre>
+         * @param node
+         * @throws SQLException
+         */
+        public void pushDownOnCondition(ParseNode node) throws SQLException {
             node.accept(onNodeVisitor);
         }
 
@@ -517,16 +607,32 @@ public class JoinCompiler {
             return onConditions;
         }
 
-        public JoinTable getJoinTable() {
-            return joinTable;
+        public JoinTable getRhsJoinTable() {
+            return rhsJoinTable;
+        }
+
+        public List<TableRef>  getRhsJoinTableRefs() {
+            return this.rhsJoinTable.getAllTableRefs();
+        }
+
+        public void pushDownFilterToRhsJoinTable(ParseNode parseNode) throws SQLException {
+             this.rhsJoinTable.pushDownFilter(parseNode);
+        }
+
+        public void addOnCondition(EqualParseNode equalParseNode) {
+            this.onConditions.add(equalParseNode);
+        }
+
+        public void addDependentTableRefs(Collection<TableRef> tableRefs) {
+            this.dependentTableRefs.addAll(tableRefs);
         }
 
         public boolean isSingleValueOnly() {
             return singleValueOnly;
         }
 
-        public Set<TableRef> getDependencies() {
-            return dependencies;
+        public Set<TableRef> getDependentTableRefs() {
+            return dependentTableRefs;
         }
 
         public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, Strategy strategy) throws SQLException {
@@ -889,28 +995,53 @@ public class JoinCompiler {
         }
     }
 
-    private static class WhereNodeVisitor extends BooleanParseNodeVisitor<Void> {
-        private Table table;
-        private List<ParseNode> postFilters;
-        private List<TableRef> selfTableRefs;
-        private boolean isPrefilterAccepted;
-        private List<JoinSpec> prefilterAcceptedTables;
-        ColumnRefParseNodeVisitor columnRefVisitor;
-
-        public WhereNodeVisitor(ColumnResolver resolver, Table table,
-                List<ParseNode> postFilters, List<TableRef> selfTableRefs, boolean isPrefilterAccepted,
-                List<JoinSpec> prefilterAcceptedTables, PhoenixConnection connection) {
-            this.table = table;
-            this.postFilters = postFilters;
-            this.selfTableRefs = selfTableRefs;
-            this.isPrefilterAccepted = isPrefilterAccepted;
-            this.prefilterAcceptedTables = prefilterAcceptedTables;
-            this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver, connection);
+    /**
+     * Push down {@link JoinTable#postFilters} of Outermost-JoinTable to
+     * {@link JoinTable#postFilters} of Sub-JoinTable
+     */
+    private static class PushDownPostFilterParseNodeVisitor extends AndRewriterBooleanParseNodeVisitor {
+        private ColumnRefParseNodeVisitor columnRefParseNodeVisitor;
+        /**
+         * Sub-JoinTable to accept pushed down PostFilters.
+         */
+        private JoinTable joinTable;
+
+        public PushDownPostFilterParseNodeVisitor(
+                ColumnResolver resolver,
+                JoinTable joinTablesContext,
+                PhoenixConnection connection) {
+            super(NODE_FACTORY);
+            this.joinTable = joinTablesContext;
+            this.columnRefParseNodeVisitor = new ColumnRefParseNodeVisitor(resolver, connection);
         }
 
         @Override
-        protected boolean enterBooleanNode(ParseNode node) throws SQLException {
-            return false;
+        protected ParseNode leaveBooleanNode(
+                ParseNode parentParseNode, List<ParseNode> childParseNodes) throws SQLException {
+            columnRefParseNodeVisitor.reset();
+            parentParseNode.accept(columnRefParseNodeVisitor);
+            ColumnRefParseNodeVisitor.ColumnRefType columnRefType =
+                    columnRefParseNodeVisitor.getContentType(
+                            this.joinTable.getAllTableRefs());
+            if(columnRefType == ColumnRefParseNodeVisitor.ColumnRefType.NONE ||
+               columnRefType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY){
+                this.joinTable.postFilters.add(parentParseNode);
+                return null;
+            }
+            return parentParseNode;
+        }
+    }
+
+    private static class WhereNodeVisitor extends AndBooleanParseNodeVisitor<Void> {
+        private ColumnRefParseNodeVisitor columnRefVisitor;
+        private JoinTable joinTable;
+
+        public WhereNodeVisitor(
+                ColumnResolver resolver,
+                JoinTable joinTablesContext,
+                PhoenixConnection connection) {
+            this.joinTable = joinTablesContext;
+            this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver, connection);
         }
 
         @Override
@@ -918,53 +1049,42 @@ public class JoinCompiler {
                 List<Void> l) throws SQLException {
             columnRefVisitor.reset();
             node.accept(columnRefVisitor);
-            ColumnRefParseNodeVisitor.ColumnRefType type = columnRefVisitor.getContentType(selfTableRefs);
+            ColumnRefParseNodeVisitor.ColumnRefType type =
+                    columnRefVisitor.getContentType(this.joinTable.getLeftTableRef());
             switch (type) {
             case NONE:
             case SELF_ONLY:
-                if (isPrefilterAccepted) {
-                    table.addFilter(node);
-                } else {
-                    postFilters.add(node);
-                }
+                this.joinTable.addLeftTableFilter(node);
                 break;
             case FOREIGN_ONLY:
                 JoinTable matched = null;
-                for (JoinSpec joinSpec : prefilterAcceptedTables) {
-                    if (columnRefVisitor.getContentType(joinSpec.getJoinTable().getTableRefs()) == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
-                        matched = joinSpec.getJoinTable();
+                for (JoinSpec joinSpec : this.joinTable.getPrefilterAcceptedJoinSpecs()) {
+                    if (columnRefVisitor.getContentType(
+                            joinSpec.getRhsJoinTable().getAllTableRefs()) ==
+                        ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
+                        matched = joinSpec.getRhsJoinTable();
                         break;
                     }
                 }
                 if (matched != null) {
-                    matched.addFilter(node);
+                    matched.pushDownFilter(node);
                 } else {
-                    postFilters.add(node);
+                    this.joinTable.addPostJoinFilter(node);
                 }
                 break;
             default:
-                postFilters.add(node);
+                this.joinTable.addPostJoinFilter(node);
                 break;
             }
             return null;
         }
 
         @Override
-        protected boolean enterNonBooleanNode(ParseNode node) throws SQLException {
-            return false;
-        }
-
-        @Override
         protected Void leaveNonBooleanNode(ParseNode node, List<Void> l) throws SQLException {
             return null;
         }
 
         @Override
-        public boolean visitEnter(AndParseNode node) throws SQLException {
-            return true;
-        }
-
-        @Override
         public Void visitLeave(AndParseNode node, List<Void> l) throws SQLException {
             return null;
         }
@@ -975,7 +1095,10 @@ public class JoinCompiler {
             if (!(node instanceof EqualParseNode))
                 return leaveBooleanNode(node, l);
 
-            ListIterator<JoinSpec> iter = prefilterAcceptedTables.listIterator(prefilterAcceptedTables.size());
+            List<JoinSpec> prefilterAcceptedJoinSpecs =
+                    this.joinTable.getPrefilterAcceptedJoinSpecs();
+            ListIterator<JoinSpec> iter =
+                    prefilterAcceptedJoinSpecs.listIterator(prefilterAcceptedJoinSpecs.size());
             while (iter.hasPrevious()) {
                 JoinSpec joinSpec = iter.previous();
                 if (joinSpec.getType() != JoinType.Inner || joinSpec.isSingleValueOnly()) {
@@ -983,7 +1106,7 @@ public class JoinCompiler {
                 }
 
                 try {
-                    joinSpec.addOnCondition(node);
+                    joinSpec.pushDownOnCondition(node);
                     return null;
                 } catch (SQLException e) {
                 }
@@ -993,33 +1116,26 @@ public class JoinCompiler {
         }
     }
 
-    private static class OnNodeVisitor extends BooleanParseNodeVisitor<Void> {
-        private List<EqualParseNode> onConditions;
-        private Set<TableRef> dependencies;
-        private JoinTable joinTable;
-        private ColumnRefParseNodeVisitor columnRefVisitor;
+    private static class OnNodeVisitor extends AndBooleanParseNodeVisitor<Void> {
+        private final ColumnRefParseNodeVisitor columnRefVisitor;
+        private final JoinSpec joinSpec;
 
-        public OnNodeVisitor(ColumnResolver resolver, List<EqualParseNode> onConditions,
-                Set<TableRef> dependencies, JoinTable joinTable, PhoenixConnection connection) {
-            this.onConditions = onConditions;
-            this.dependencies = dependencies;
-            this.joinTable = joinTable;
+        public OnNodeVisitor(
+                ColumnResolver resolver, JoinSpec joinSpec, PhoenixConnection connection) {
+            this.joinSpec = joinSpec;
             this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver, connection);
         }
-        @Override
-        protected boolean enterBooleanNode(ParseNode node) throws SQLException {
-            return false;
-        }
 
         @Override
         protected Void leaveBooleanNode(ParseNode node,
                 List<Void> l) throws SQLException {
             columnRefVisitor.reset();
             node.accept(columnRefVisitor);
-            ColumnRefParseNodeVisitor.ColumnRefType type = columnRefVisitor.getContentType(joinTable.getTableRefs());
+            ColumnRefParseNodeVisitor.ColumnRefType type =
+                    columnRefVisitor.getContentType(this.joinSpec.getRhsJoinTableRefs());
             if (type == ColumnRefParseNodeVisitor.ColumnRefType.NONE
                     || type == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
-                joinTable.addFilter(node);
+                this.joinSpec.pushDownFilterToRhsJoinTable(node);
             } else {
                 throwAmbiguousJoinConditionException();
             }
@@ -1027,20 +1143,11 @@ public class JoinCompiler {
         }
 
         @Override
-        protected boolean enterNonBooleanNode(ParseNode node) throws SQLException {
-            return false;
-        }
-
-        @Override
         protected Void leaveNonBooleanNode(ParseNode node, List<Void> l) throws SQLException {
             return null;
         }
 
         @Override
-        public boolean visitEnter(AndParseNode node) throws SQLException {
-            return true;
-        }
-        @Override
         public Void visitLeave(AndParseNode node, List<Void> l) throws SQLException {
             return null;
         }
@@ -1052,23 +1159,25 @@ public class JoinCompiler {
                 return leaveBooleanNode(node, l);
             columnRefVisitor.reset();
             node.getLHS().accept(columnRefVisitor);
-            ColumnRefParseNodeVisitor.ColumnRefType lhsType = columnRefVisitor.getContentType(joinTable.getTableRefs());
+            ColumnRefParseNodeVisitor.ColumnRefType lhsType =
+                    columnRefVisitor.getContentType(this.joinSpec.getRhsJoinTableRefs());
             Set<TableRef> lhsTableRefSet = Sets.newHashSet(columnRefVisitor.getTableRefSet());
             columnRefVisitor.reset();
             node.getRHS().accept(columnRefVisitor);
-            ColumnRefParseNodeVisitor.ColumnRefType rhsType = columnRefVisitor.getContentType(joinTable.getTableRefs());
+            ColumnRefParseNodeVisitor.ColumnRefType rhsType =
+                    columnRefVisitor.getContentType(this.joinSpec.getRhsJoinTableRefs());
             Set<TableRef> rhsTableRefSet = Sets.newHashSet(columnRefVisitor.getTableRefSet());
             if ((lhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY || lhsType == ColumnRefParseNodeVisitor.ColumnRefType.NONE)
                     && (rhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY || rhsType == ColumnRefParseNodeVisitor.ColumnRefType.NONE)) {
-                joinTable.addFilter(node);
+                this.joinSpec.pushDownFilterToRhsJoinTable(node);
             } else if (lhsType == ColumnRefParseNodeVisitor.ColumnRefType.FOREIGN_ONLY
                     && rhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
-                onConditions.add((EqualParseNode) node);
-                dependencies.addAll(lhsTableRefSet);
+                this.joinSpec.addOnCondition((EqualParseNode) node);
+                this.joinSpec.addDependentTableRefs(lhsTableRefSet);
             } else if (rhsType == ColumnRefParseNodeVisitor.ColumnRefType.FOREIGN_ONLY
                     && lhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
-                onConditions.add(NODE_FACTORY.equal(node.getRHS(), node.getLHS()));
-                dependencies.addAll(rhsTableRefSet);
+                this.joinSpec.addOnCondition(NODE_FACTORY.equal(node.getRHS(), node.getLHS()));
+                this.joinSpec.addDependentTableRefs(rhsTableRefSet);
             } else {
                 throwAmbiguousJoinConditionException();
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index e7d8e18..e149c77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -215,7 +215,7 @@ public class QueryCompiler {
      */
     protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
         if (joinTable.getJoinSpecs().isEmpty()) {
-            Table table = joinTable.getTable();
+            Table table = joinTable.getLeftTable();
             SelectStatement subquery = table.getAsSubquery(orderBy);
             if (!table.isSubselect()) {
                 context.setCurrentTable(table.getTableRef());
@@ -280,7 +280,7 @@ public class QueryCompiler {
         switch (strategy) {
             case HASH_BUILD_RIGHT: {
                 boolean[] starJoinVector = joinTable.getStarJoinVector();
-                Table table = joinTable.getTable();
+                Table table = joinTable.getLeftTable();
                 PTable initialProjectedTable;
                 TableRef tableRef;
                 SelectStatement query;
@@ -317,8 +317,14 @@ public class QueryCompiler {
                     JoinSpec joinSpec = joinSpecs.get(i);
                     Scan subScan = ScanUtil.newScan(originalScan);
                     subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                    subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
-                    boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
+                    subPlans[i] = compileJoinQuery(
+                            subContexts[i],
+                            binds,
+                            joinSpec.getRhsJoinTable(),
+                            true,
+                            true,
+                            null);
+                    boolean hasPostReference = joinSpec.getRhsJoinTable().hasPostReference();
                     if (hasPostReference) {
                         tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
                         projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
@@ -366,9 +372,9 @@ public class QueryCompiler {
             case HASH_BUILD_LEFT: {
                 JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
                 JoinType type = lastJoinSpec.getType();
-                JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
-                Table rhsTable = rhsJoinTable.getTable();
-                JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+                JoinTable rhsJoinTable = lastJoinSpec.getRhsJoinTable();
+                Table rhsTable = rhsJoinTable.getLeftTable();
+                JoinTable lhsJoin = joinTable.createSubJoinTable(statement.getConnection());
                 Scan subScan = ScanUtil.newScan(originalScan);
                 StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
                 QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
@@ -428,10 +434,10 @@ public class QueryCompiler {
                 return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, usePersistentCache, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
             }
             case SORT_MERGE: {
-                JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+                JoinTable lhsJoin =  joinTable.createSubJoinTable(statement.getConnection());
                 JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
                 JoinType type = lastJoinSpec.getType();
-                JoinTable rhsJoin = lastJoinSpec.getJoinTable();
+                JoinTable rhsJoin = lastJoinSpec.getRhsJoinTable();
                 if (type == JoinType.Right) {
                     JoinTable temp = lhsJoin;
                     lhsJoin = rhsJoin;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index f051aa5..dd9e62b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.expression.function.DistinctValueAggregateFunction;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.AndRewriterBooleanParseNodeVisitor;
 import org.apache.phoenix.parse.ArrayAllComparisonNode;
 import org.apache.phoenix.parse.ArrayAnyComparisonNode;
 import org.apache.phoenix.parse.BooleanParseNodeVisitor;
@@ -408,7 +409,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
         return count == 1 ? equalNodes.get(0) : NODE_FACTORY.and(equalNodes);
     }
     
-    private static class JoinConditionExtractor extends BooleanParseNodeVisitor<ParseNode> {
+    private static class JoinConditionExtractor extends AndRewriterBooleanParseNodeVisitor {
         private final TableName tableName;
         private ColumnResolveVisitor columnResolveVisitor;
         private List<AliasedNode> additionalSelectNodes;
@@ -416,6 +417,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
         
         public JoinConditionExtractor(SelectStatement subquery, ColumnResolver outerResolver, 
                 PhoenixConnection connection, String tableAlias) throws SQLException {
+            super(NODE_FACTORY);
             this.tableName = NODE_FACTORY.table(null, tableAlias);
             ColumnResolver localResolver = FromCompiler.getResolverForQuery(subquery, connection);
             this.columnResolveVisitor = new ColumnResolveVisitor(localResolver, outerResolver);
@@ -436,43 +438,6 @@ public class SubqueryRewriter extends ParseNodeRewriter {
             
             return NODE_FACTORY.and(this.joinConditions);            
         }
-        
-        @Override
-        public List<ParseNode> newElementList(int size) {
-            return Lists.<ParseNode> newArrayListWithExpectedSize(size);
-        }
-
-        @Override
-        public void addElement(List<ParseNode> l, ParseNode element) {
-            if (element != null) {
-                l.add(element);
-            }
-        }
-
-        @Override
-        public boolean visitEnter(AndParseNode node) throws SQLException {
-            return true;
-        }
-
-        @Override
-        public ParseNode visitLeave(AndParseNode node, List<ParseNode> l)
-                throws SQLException {
-            if (l.equals(node.getChildren()))
-                return node;
-
-            if (l.isEmpty())
-                return null;
-            
-            if (l.size() == 1)
-                return l.get(0);
-            
-            return NODE_FACTORY.and(l);
-        }
-
-        @Override
-        protected boolean enterBooleanNode(ParseNode node) throws SQLException {
-            return false;
-        }
 
         @Override
         protected ParseNode leaveBooleanNode(ParseNode node, List<ParseNode> l)
@@ -488,18 +453,6 @@ public class SubqueryRewriter extends ParseNodeRewriter {
         }
 
         @Override
-        protected boolean enterNonBooleanNode(ParseNode node)
-                throws SQLException {
-            return false;
-        }
-
-        @Override
-        protected ParseNode leaveNonBooleanNode(ParseNode node,
-                List<ParseNode> l) throws SQLException {
-            return node;
-        }
-
-        @Override
         public ParseNode visitLeave(ComparisonParseNode node, List<ParseNode> l) throws SQLException {
             if (node.getFilterOp() != CompareFilter.CompareOp.EQUAL)
                 return leaveBooleanNode(node, l);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 4f0dfeb..45c7e7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.AndRewriterBooleanParseNodeVisitor;
 import org.apache.phoenix.parse.BindTableNode;
 import org.apache.phoenix.parse.BooleanParseNodeVisitor;
 import org.apache.phoenix.parse.ColumnParseNode;
@@ -143,7 +144,7 @@ public class QueryOptimizer {
                 || (select.getWhere() != null && select.getWhere().hasSubquery())) {
             JoinCompiler.JoinTable join = JoinCompiler.compile(statement, select, resolver);
             Map<TableRef, TableRef> replacement = null;
-            for (JoinCompiler.Table table : join.getTables()) {
+            for (JoinCompiler.Table table : join.getAllTables()) {
                 if (table.isSubselect())
                     continue;
                 TableRef tableRef = table.getTableRef();
@@ -547,12 +548,13 @@ public class QueryOptimizer {
     }
 
     
-    private static class WhereConditionRewriter extends BooleanParseNodeVisitor<ParseNode> {
+    private static class WhereConditionRewriter extends AndRewriterBooleanParseNodeVisitor {
         private final ColumnResolver dataResolver;
         private final ExpressionCompiler expressionCompiler;
         private List<ParseNode> extractedConditions;
         
         public WhereConditionRewriter(ColumnResolver dataResolver, StatementContext context) throws SQLException {
+            super(FACTORY);
             this.dataResolver = dataResolver;
             this.expressionCompiler = new ExpressionCompiler(context);
             this.extractedConditions = Lists.<ParseNode> newArrayList();
@@ -567,43 +569,6 @@ public class QueryOptimizer {
             
             return FACTORY.and(this.extractedConditions);            
         }
-        
-        @Override
-        public List<ParseNode> newElementList(int size) {
-            return Lists.<ParseNode> newArrayListWithExpectedSize(size);
-        }
-
-        @Override
-        public void addElement(List<ParseNode> l, ParseNode element) {
-            if (element != null) {
-                l.add(element);
-            }
-        }
-
-        @Override
-        public boolean visitEnter(AndParseNode node) throws SQLException {
-            return true;
-        }
-
-        @Override
-        public ParseNode visitLeave(AndParseNode node, List<ParseNode> l)
-                throws SQLException {
-            if (l.equals(node.getChildren()))
-                return node;
-
-            if (l.isEmpty())
-                return null;
-            
-            if (l.size() == 1)
-                return l.get(0);
-            
-            return FACTORY.and(l);
-        }
-
-        @Override
-        protected boolean enterBooleanNode(ParseNode node) throws SQLException {
-            return false;
-        }
 
         @Override
         protected ParseNode leaveBooleanNode(ParseNode node, List<ParseNode> l)
@@ -619,18 +584,6 @@ public class QueryOptimizer {
             
             return translatedNode;
         }
-
-        @Override
-        protected boolean enterNonBooleanNode(ParseNode node)
-                throws SQLException {
-            return false;
-        }
-
-        @Override
-        protected ParseNode leaveNonBooleanNode(ParseNode node,
-                List<ParseNode> l) throws SQLException {
-            return node;
-        }
     }
 
     private static SelectStatement rewriteQueryWithIndexReplacement(
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AndBooleanParseNodeVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AndBooleanParseNodeVisitor.java
new file mode 100644
index 0000000..2b9b816
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AndBooleanParseNodeVisitor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+
+/**
+ * Only further visit down for {@link AndParseNode}.
+ */
+public abstract class AndBooleanParseNodeVisitor<T> extends BooleanParseNodeVisitor<T> {
+
+    @Override
+    protected boolean enterBooleanNode(ParseNode parseNode) throws SQLException {
+        return false;
+    }
+
+    @Override
+    protected boolean enterNonBooleanNode(ParseNode parseNode) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean visitEnter(AndParseNode andParseNode) throws SQLException {
+        return true;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AndRewriterBooleanParseNodeVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AndRewriterBooleanParseNodeVisitor.java
new file mode 100644
index 0000000..905e18f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AndRewriterBooleanParseNodeVisitor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Base visitor for rewrite {@link ParseNode},only further visit down for {@link AndParseNode}.
+ * A example is {@link org.apache.phoenix.optimize.QueryOptimizer.WhereConditionRewriter}, which
+ * rewrites columns in dataTable to columns in indexTable, and removes parseNodes which have
+ * columns not in indexTable.
+ */
+public abstract class AndRewriterBooleanParseNodeVisitor extends AndBooleanParseNodeVisitor<ParseNode>{
+
+    private final ParseNodeFactory parseNodeFactory ;
+
+    public AndRewriterBooleanParseNodeVisitor(ParseNodeFactory parseNodeFactory) {
+        this.parseNodeFactory = parseNodeFactory;
+    }
+
+    @Override
+    public List<ParseNode> newElementList(int size) {
+        return Lists.<ParseNode> newArrayListWithExpectedSize(size);
+    }
+
+    @Override
+    public void addElement(List<ParseNode> childParseNodeResults, ParseNode newChildParseNode) {
+        if (newChildParseNode != null) {
+            childParseNodeResults.add(newChildParseNode);
+        }
+    }
+
+    @Override
+    protected ParseNode leaveNonBooleanNode(ParseNode parentParseNode, List<ParseNode> childParseNodes) throws SQLException {
+        return parentParseNode;
+    }
+
+    @Override
+    public ParseNode visitLeave(AndParseNode andParseNode, List<ParseNode> childParseNodes) throws SQLException {
+        if (childParseNodes.equals(andParseNode.getChildren())) {
+            return andParseNode;
+        }
+
+        if (childParseNodes.isEmpty()) {
+            return null;
+        }
+
+        if (childParseNodes.size() == 1) {
+            return childParseNodes.get(0);
+        }
+
+        return this.parseNodeFactory.and(childParseNodes);
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java
index 8c1f536..d091e69 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java
@@ -114,51 +114,51 @@ public class JoinQueryCompilerTest extends BaseConnectionlessQueryTest {
 
         String query = String.format(queryTemplate, "INNER", "INNER");
         JoinTable joinTable = getJoinTable(query, pconn);
-        assertEquals(1, joinTable.getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(0).getJoinTable().getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(1).getJoinTable().getTable().getPreFilters().size());
+        assertEquals(1, joinTable.getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(0).getRhsJoinTable().getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(1).getRhsJoinTable().getLeftTable().getPreFilters().size());
 
         query = String.format(queryTemplate, "INNER", "LEFT");
         joinTable = getJoinTable(query, pconn);
-        assertEquals(1, joinTable.getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(0).getJoinTable().getTable().getPreFilters().size());
-        assertEquals(0, joinTable.getJoinSpecs().get(1).getJoinTable().getTable().getPreFilters().size());
+        assertEquals(1, joinTable.getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(0).getRhsJoinTable().getLeftTable().getPreFilters().size());
+        assertEquals(0, joinTable.getJoinSpecs().get(1).getRhsJoinTable().getLeftTable().getPreFilters().size());
 
         query = String.format(queryTemplate, "INNER", "RIGHT");
         joinTable = getJoinTable(query, pconn);
-        assertEquals(0, joinTable.getTable().getPreFilters().size());
-        assertEquals(0, joinTable.getJoinSpecs().get(0).getJoinTable().getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(1).getJoinTable().getTable().getPreFilters().size());
+        assertEquals(0, joinTable.getLeftTable().getPreFilters().size());
+        assertEquals(0, joinTable.getJoinSpecs().get(0).getRhsJoinTable().getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(1).getRhsJoinTable().getLeftTable().getPreFilters().size());
 
         query = String.format(queryTemplate, "LEFT", "INNER");
         joinTable = getJoinTable(query, pconn);
-        assertEquals(1, joinTable.getTable().getPreFilters().size());
-        assertEquals(0, joinTable.getJoinSpecs().get(0).getJoinTable().getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(1).getJoinTable().getTable().getPreFilters().size());
+        assertEquals(1, joinTable.getLeftTable().getPreFilters().size());
+        assertEquals(0, joinTable.getJoinSpecs().get(0).getRhsJoinTable().getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(1).getRhsJoinTable().getLeftTable().getPreFilters().size());
 
         query = String.format(queryTemplate, "LEFT", "LEFT");
         joinTable = getJoinTable(query, pconn);
-        assertEquals(1, joinTable.getTable().getPreFilters().size());
-        assertEquals(0, joinTable.getJoinSpecs().get(0).getJoinTable().getTable().getPreFilters().size());
-        assertEquals(0, joinTable.getJoinSpecs().get(1).getJoinTable().getTable().getPreFilters().size());
+        assertEquals(1, joinTable.getLeftTable().getPreFilters().size());
+        assertEquals(0, joinTable.getJoinSpecs().get(0).getRhsJoinTable().getLeftTable().getPreFilters().size());
+        assertEquals(0, joinTable.getJoinSpecs().get(1).getRhsJoinTable().getLeftTable().getPreFilters().size());
 
         query = String.format(queryTemplate, "LEFT", "RIGHT");
         joinTable = getJoinTable(query, pconn);
-        assertEquals(0, joinTable.getTable().getPreFilters().size());
-        assertEquals(0, joinTable.getJoinSpecs().get(0).getJoinTable().getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(1).getJoinTable().getTable().getPreFilters().size());
+        assertEquals(0, joinTable.getLeftTable().getPreFilters().size());
+        assertEquals(0, joinTable.getJoinSpecs().get(0).getRhsJoinTable().getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(1).getRhsJoinTable().getLeftTable().getPreFilters().size());
 
         query = String.format(queryTemplate, "RIGHT", "INNER");
         joinTable = getJoinTable(query, pconn);
-        assertEquals(0, joinTable.getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(0).getJoinTable().getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(1).getJoinTable().getTable().getPreFilters().size());
+        assertEquals(0, joinTable.getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(0).getRhsJoinTable().getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(1).getRhsJoinTable().getLeftTable().getPreFilters().size());
 
         query = String.format(queryTemplate, "RIGHT", "RIGHT");
         joinTable = getJoinTable(query, pconn);
-        assertEquals(0, joinTable.getTable().getPreFilters().size());
-        assertEquals(0, joinTable.getJoinSpecs().get(0).getJoinTable().getTable().getPreFilters().size());
-        assertEquals(1, joinTable.getJoinSpecs().get(1).getJoinTable().getTable().getPreFilters().size());
+        assertEquals(0, joinTable.getLeftTable().getPreFilters().size());
+        assertEquals(0, joinTable.getJoinSpecs().get(0).getRhsJoinTable().getLeftTable().getPreFilters().size());
+        assertEquals(1, joinTable.getJoinSpecs().get(1).getRhsJoinTable().getLeftTable().getPreFilters().size());
     }
     
     private static JoinTable getJoinTable(String query, PhoenixConnection connection) throws SQLException {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 746e348..340c7c8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.execute.ClientScanPlan;
 import org.apache.phoenix.execute.CorrelatePlan;
 import org.apache.phoenix.execute.CursorFetchPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
 import org.apache.phoenix.execute.LiteralResultIterationPlan;
 import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.execute.SortMergeJoinPlan;
@@ -5978,4 +5979,210 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             }
         }
     }
+
+    @Test
+    public void testPushDownPostFilterToSubJoinBug5389() throws Exception {
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl());
+            String orderTableName = "order_table";
+            String itemTableName = "item_table";
+            String supplierTableName = "supplier_table";
+            String sql = "create table " + orderTableName +
+                    "   (order_id varchar(15) not null primary key, " +
+                    "    customer_id varchar(10), " +
+                    "    item_id varchar(10), " +
+                    "    price integer, " +
+                    "    quantity integer, " +
+                    "    date timestamp)";
+            conn.createStatement().execute(sql);
+
+            sql = "create table " + itemTableName +
+                    "   (item_id varchar(10) not null primary key, " +
+                    "    name varchar, " +
+                    "    price integer, " +
+                    "    discount1 integer, " +
+                    "    discount2 integer, " +
+                    "    supplier_id varchar(10), " +
+                    "    description varchar)";
+            conn.createStatement().execute(sql);
+
+            sql = "create table " + supplierTableName +
+                    "   (supplier_id varchar(10) not null primary key, " +
+                    "    name varchar, " +
+                    "    phone varchar(12), " +
+                    "    address varchar, " +
+                    "    loc_id varchar(5))";
+            conn.createStatement().execute(sql);
+
+            doTestPushDownPostFilterToSubJoinForNoStarJoinBug5389(conn, supplierTableName, itemTableName, orderTableName);
+            doTestPushDownPostFilterToSubJoinForSortMergeJoinBug5389(conn, supplierTableName, itemTableName, orderTableName);
+        } finally {
+            if(conn != null) {
+                conn.close();
+            }
+        }
+    }
+
+    private void doTestPushDownPostFilterToSubJoinForNoStarJoinBug5389(
+            Connection conn,
+            String supplierTableName,
+            String itemTableName,
+            String orderTableName) throws Exception {
+        //one condition push down.
+        String sql = "select /*+ NO_STAR_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName + " s inner join " + itemTableName + " i on  s.supplier_id = i.supplier_id "+
+                "inner join " + orderTableName + " o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.supplier_id != 'medi' or s.address = 'hai')";
+        QueryPlan queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        HashJoinPlan hashJoinPlan = (HashJoinPlan)queryPlan;
+        assertTrue(hashJoinPlan.getJoinInfo().getPostJoinFilterExpression() == null);
+        HashSubPlan[] hashSubPlans = (HashSubPlan[])hashJoinPlan.getSubPlans();
+        assertTrue(hashSubPlans.length == 1);
+        HashJoinPlan subHashJoinPlan = (HashJoinPlan)(hashSubPlans[0].getInnerPlan());
+        Expression postFilterExpression = subHashJoinPlan.getJoinInfo().getPostJoinFilterExpression();
+        assertTrue(postFilterExpression.toString().equals(
+                "(I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai')"));
+
+        //postFilter references all tables can not push down to subjoin.
+        sql = "select /*+ NO_STAR_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName + " s inner join " + itemTableName + " i on  s.supplier_id = i.supplier_id "+
+                "inner join " + orderTableName + " o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.supplier_id != 'medi' or s.address = 'hai' or o.quantity = 8)";
+        queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        hashJoinPlan = (HashJoinPlan)queryPlan;
+        assertTrue(hashJoinPlan.getJoinInfo().getPostJoinFilterExpression().toString().equals(
+                "(I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai' OR O.QUANTITY = 8)"));
+        hashSubPlans = (HashSubPlan[])hashJoinPlan.getSubPlans();
+        assertTrue(hashSubPlans.length == 1);
+        subHashJoinPlan = (HashJoinPlan)(hashSubPlans[0].getInnerPlan());
+        assertTrue(subHashJoinPlan.getJoinInfo().getPostJoinFilterExpression() == null);
+
+        //one condition can not push down and other two conditions can push down.
+        sql = "select /*+ NO_STAR_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName + " s inner join " + itemTableName + " i on  s.supplier_id = i.supplier_id "+
+                "inner join " + orderTableName + " o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.description= 'desc1' or o.quantity > 10) and (i.supplier_id != 'medi' or s.address = 'hai') and (i.name is not null or s.loc_id != '8')";
+        queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        hashJoinPlan = (HashJoinPlan)queryPlan;
+        assertTrue(hashJoinPlan.getJoinInfo().getPostJoinFilterExpression().toString().equals(
+                "(I.DESCRIPTION = 'desc1' OR O.QUANTITY > 10)"));
+        hashSubPlans = (HashSubPlan[])hashJoinPlan.getSubPlans();
+        assertTrue(hashSubPlans.length == 1);
+        subHashJoinPlan = (HashJoinPlan)(hashSubPlans[0].getInnerPlan());
+        postFilterExpression = subHashJoinPlan.getJoinInfo().getPostJoinFilterExpression();
+        assertTrue(postFilterExpression.toString().equals(
+                "((I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai') AND (I.NAME IS NOT NULL OR S.LOC_ID != '8'))"));
+
+        //for right join,can not push down
+        sql = "select /*+ NO_STAR_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName + " s inner join " + itemTableName + " i on  s.supplier_id = i.supplier_id "+
+                "right join " + orderTableName + " o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.supplier_id != 'medi' or s.address = 'hai')";
+        queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        hashJoinPlan = (HashJoinPlan)queryPlan;
+        assertTrue(hashJoinPlan.getJoinInfo().getPostJoinFilterExpression().toString().equals(
+                "(I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai')"));
+        hashSubPlans = (HashSubPlan[])hashJoinPlan.getSubPlans();
+        assertTrue(hashSubPlans.length == 1);
+        subHashJoinPlan = (HashJoinPlan)(hashSubPlans[0].getInnerPlan());
+        assertTrue(subHashJoinPlan.getJoinInfo().getPostJoinFilterExpression() == null);
+
+        //for right join,can not push down
+        sql = "select /*+ NO_STAR_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName + " s inner join " + itemTableName + " i on  s.supplier_id = i.supplier_id "+
+                "right join " + orderTableName + " o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.description= 'desc1' or o.quantity > 10) and (i.supplier_id != 'medi' or s.address = 'hai') and (i.name is not null or s.loc_id != '8')";
+        queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        hashJoinPlan = (HashJoinPlan)queryPlan;
+        assertTrue(hashJoinPlan.getJoinInfo().getPostJoinFilterExpression().toString().equals(
+                "((I.DESCRIPTION = 'desc1' OR O.QUANTITY > 10) AND (I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai') AND (I.NAME IS NOT NULL OR S.LOC_ID != '8'))"));
+        hashSubPlans = (HashSubPlan[])hashJoinPlan.getSubPlans();
+        assertTrue(hashSubPlans.length == 1);
+        subHashJoinPlan = (HashJoinPlan)(hashSubPlans[0].getInnerPlan());
+        assertTrue(subHashJoinPlan.getJoinInfo().getPostJoinFilterExpression() == null);
+    }
+
+    private void doTestPushDownPostFilterToSubJoinForSortMergeJoinBug5389(
+            Connection conn,
+            String supplierTableName,
+            String itemTableName,
+            String orderTableName) throws Exception {
+        //one condition push down.
+        String sql = "select /*+ USE_SORT_MERGE_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName+" s inner join "+itemTableName+" i on  s.supplier_id = i.supplier_id "+
+                "inner join "+orderTableName+" o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.supplier_id != 'medi' or s.address = 'hai')";
+        QueryPlan queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        ClientScanPlan clientScanPlan = (ClientScanPlan)queryPlan;
+        assertTrue(clientScanPlan.getWhere() == null);
+        SortMergeJoinPlan sortMergeJoinPlan = (SortMergeJoinPlan)clientScanPlan.getDelegate();
+        ClientScanPlan lhsClientScanPlan = (ClientScanPlan)sortMergeJoinPlan.getLhsPlan();
+        assertTrue(lhsClientScanPlan.getWhere().toString().equals(
+                "(I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai')"));
+
+        //can not push down to subjoin.
+        sql = "select /*+ USE_SORT_MERGE_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName+" s inner join "+itemTableName+" i on  s.supplier_id = i.supplier_id "+
+                "inner join "+orderTableName+" o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.supplier_id != 'medi' or s.address = 'hai' or o.quantity = 8)";
+        queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        clientScanPlan = (ClientScanPlan)queryPlan;
+        assertTrue(clientScanPlan.getWhere().toString().equals(
+                "(I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai' OR O.QUANTITY = 8)"));
+        sortMergeJoinPlan = (SortMergeJoinPlan)clientScanPlan.getDelegate();
+        lhsClientScanPlan = (ClientScanPlan)sortMergeJoinPlan.getLhsPlan();
+        assertTrue(lhsClientScanPlan.getWhere() == null);
+
+        //one condition can not push down and other two conditions can push down.
+        sql = "select /*+ USE_SORT_MERGE_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName+" s inner join "+itemTableName+" i on  s.supplier_id = i.supplier_id "+
+                "inner join "+orderTableName+" o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.description= 'desc1' or o.quantity > 10) and (i.supplier_id != 'medi' or s.address = 'hai') and (i.name is not null or s.loc_id != '8')";
+        queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        clientScanPlan = (ClientScanPlan)queryPlan;
+        assertTrue(clientScanPlan.getWhere().toString().equals(
+                "(I.DESCRIPTION = 'desc1' OR O.QUANTITY > 10)"));
+        sortMergeJoinPlan = (SortMergeJoinPlan)clientScanPlan.getDelegate();
+        lhsClientScanPlan = (ClientScanPlan)sortMergeJoinPlan.getLhsPlan();
+        assertTrue(lhsClientScanPlan.getWhere().toString().equals(
+                "((I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai') AND (I.NAME IS NOT NULL OR S.LOC_ID != '8'))"));
+
+       //for right join,can not push down
+        sql = "select /*+ USE_SORT_MERGE_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName+" s inner join "+itemTableName+" i on  s.supplier_id = i.supplier_id "+
+                "right join "+orderTableName+" o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.supplier_id != 'medi' or s.address = 'hai')";
+        queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        clientScanPlan = (ClientScanPlan)queryPlan;
+        assertTrue(clientScanPlan.getWhere().toString().equals(
+                "(I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai')"));
+        sortMergeJoinPlan = (SortMergeJoinPlan)clientScanPlan.getDelegate();
+        //for right join, SortMergeJoinPlan exchanges left and right
+        ClientScanPlan rhsClientScanPlan = (ClientScanPlan)sortMergeJoinPlan.getRhsPlan();
+        assertTrue(rhsClientScanPlan.getWhere() == null);
+
+        //for full join,can not push down
+        sql = "select /*+ USE_SORT_MERGE_JOIN */ COALESCE(o.order_id,'empty_order_id'),i.item_id, i.discount2+5, s.supplier_id, lower(s.name) from "+
+                supplierTableName+" s inner join "+itemTableName+" i on  s.supplier_id = i.supplier_id "+
+                "full join "+orderTableName+" o on  i.item_id = o.item_id "+
+                "where (o.price < 10 or o.price > 20) and "+
+                "(i.description= 'desc1' or o.quantity > 10) and (i.supplier_id != 'medi' or s.address = 'hai') and (i.name is not null or s.loc_id != '8')";
+        queryPlan = TestUtil.getOptimizeQueryPlanNoIterator(conn, sql);
+        clientScanPlan = (ClientScanPlan)queryPlan;
+        assertTrue(clientScanPlan.getWhere().toString().equals(
+                "((O.PRICE < 10 OR O.PRICE > 20) AND (I.DESCRIPTION = 'desc1' OR O.QUANTITY > 10) AND (I.SUPPLIER_ID != 'medi' OR S.ADDRESS = 'hai') AND (I.NAME IS NOT NULL OR S.LOC_ID != '8'))"));
+        sortMergeJoinPlan = (SortMergeJoinPlan)clientScanPlan.getDelegate();
+        lhsClientScanPlan = (ClientScanPlan)sortMergeJoinPlan.getLhsPlan();
+        assertTrue(lhsClientScanPlan.getWhere() == null);
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index ae5a84e..fbe047c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1085,13 +1085,19 @@ public class TestUtil {
         return ByteUtil.compare(op, compareResult);
     }
 
-    public static QueryPlan getOptimizeQueryPlan(Connection conn,String sql) throws SQLException {
+    public static QueryPlan getOptimizeQueryPlan(Connection conn, String sql) throws SQLException {
         PhoenixPreparedStatement statement = conn.prepareStatement(sql).unwrap(PhoenixPreparedStatement.class);
         QueryPlan queryPlan = statement.optimizeQuery(sql);
         queryPlan.iterator();
         return queryPlan;
     }
 
+    public static QueryPlan getOptimizeQueryPlanNoIterator(Connection conn, String sql) throws SQLException {
+        PhoenixPreparedStatement statement = conn.prepareStatement(sql).unwrap(PhoenixPreparedStatement.class);
+        QueryPlan queryPlan = statement.optimizeQuery(sql);
+        return queryPlan;
+    }
+
     public static void assertResultSet(ResultSet rs,Object[][] rows) throws Exception {
         for(int rowIndex=0; rowIndex < rows.length; rowIndex++) {
             assertTrue("rowIndex:["+rowIndex+"] rs.next error!",rs.next());