You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/10/13 17:05:28 UTC

git commit: PHOENIX-1332 Support correlated subqueries in comparison with ANY/SOME/ALL

Repository: phoenix
Updated Branches:
  refs/heads/master 656acefd1 -> 49ec34be2


PHOENIX-1332 Support correlated subqueries in comparison with ANY/SOME/ALL


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/49ec34be
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/49ec34be
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/49ec34be

Branch: refs/heads/master
Commit: 49ec34be258ce12ca150c5c37a35e2c1cad0105c
Parents: 656acef
Author: maryannxue <ma...@apache.org>
Authored: Mon Oct 13 11:05:00 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Mon Oct 13 11:05:00 2014 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/SubqueryIT.java  |  79 ++++++++
 .../org/apache/phoenix/cache/HashCache.java     |   4 +-
 .../apache/phoenix/compile/JoinCompiler.java    |  15 +-
 .../apache/phoenix/compile/QueryCompiler.java   |   4 +-
 .../phoenix/compile/SubqueryRewriter.java       | 181 +++++++++++++++----
 .../coprocessor/HashJoinRegionScanner.java      |   1 +
 .../apache/phoenix/execute/HashJoinPlan.java    |   5 +-
 .../expression/ArrayConstructorExpression.java  |  16 +-
 .../phoenix/expression/ExpressionType.java      |   2 +
 .../DistinctValueClientAggregator.java          |  63 +++++++
 .../DistinctValueWithCountServerAggregator.java |   2 +-
 .../DistinctValueAggregateFunction.java         |  66 +++++++
 .../apache/phoenix/join/HashCacheClient.java    |   7 +-
 .../apache/phoenix/join/HashCacheFactory.java   |  15 +-
 .../org/apache/phoenix/parse/JoinTableNode.java |   8 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |   6 +-
 .../apache/phoenix/parse/ParseNodeRewriter.java |   2 +-
 17 files changed, 417 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
index 58d92f3..e4b4c8b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
@@ -899,6 +899,85 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
             rs = conn.createStatement().executeQuery("EXPLAIN " + query);
             String plan = QueryUtil.getExplainPlan(rs);
             assertTrue("\"" + plan + "\" does not match \"" + plans[4] + "\"", Pattern.matches(plans[4], plan));
+
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004')";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+
+            assertFalse(rs.next());
+
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003')";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            try {
+                while(rs.next());
+                fail("Should have got exception.");
+            } catch (SQLException e) {                
+            }
+
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004' GROUP BY \"order_id\")";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+
+            assertFalse(rs.next());
+
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003' GROUP BY \"order_id\")";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            try {
+                while(rs.next());
+                fail("Should have got exception.");
+            } catch (SQLException e) {                
+            }
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testAnyAllComparisonSubquery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            String query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = ALL(SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")";
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+
+            assertFalse(rs.next());
+            
+            query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity != ALL(SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+
+            assertFalse(rs.next());
         } finally {
             conn.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
index e604f63..311f119 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -18,10 +18,10 @@
 package org.apache.phoenix.cache;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.http.annotation.Immutable;
-
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.tuple.Tuple;
 
@@ -34,5 +34,5 @@ import org.apache.phoenix.schema.tuple.Tuple;
  */
 @Immutable
 public interface HashCache extends Closeable {
-    public List<Tuple> get(ImmutableBytesPtr hashKey);
+    public List<Tuple> get(ImmutableBytesPtr hashKey) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index d473c97..7e5382e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -187,7 +187,7 @@ public class JoinCompiler {
             if (joinSpecs == null) {
                 joinSpecs = new ArrayList<JoinSpec>();
             }
-            joinSpecs.add(new JoinSpec(joinNode.getType(), joinNode.getOnNode(), joinTable, origResolver));
+            joinSpecs.add(new JoinSpec(joinNode.getType(), joinNode.getOnNode(), joinTable, joinNode.isSingleValueOnly(), origResolver));
             
             return new Pair<Table, List<JoinSpec>>(lhs.getFirst(), joinSpecs);
         }
@@ -362,7 +362,8 @@ public class JoinCompiler {
                             && count > 1 
                             && joinSpecs.get(count - 1).getType() != JoinType.Left
                             && joinSpecs.get(count - 1).getType() != JoinType.Semi
-                            && joinSpecs.get(count - 1).getType() != JoinType.Anti))
+                            && joinSpecs.get(count - 1).getType() != JoinType.Anti
+                            && !joinSpecs.get(count - 1).isSingleValueOnly()))
                 return null;
 
             boolean[] vector = new boolean[count];
@@ -437,13 +438,15 @@ public class JoinCompiler {
         private final JoinType type;
         private final List<ComparisonParseNode> onConditions;
         private final JoinTable joinTable;
+        private final boolean singleValueOnly;
         private Set<TableRef> dependencies;
         
         private JoinSpec(JoinType type, ParseNode onNode, JoinTable joinTable, 
-                ColumnResolver resolver) throws SQLException {
+                boolean singleValueOnly, ColumnResolver resolver) throws SQLException {
             this.type = type;
             this.onConditions = new ArrayList<ComparisonParseNode>();
             this.joinTable = joinTable;
+            this.singleValueOnly = singleValueOnly;
             this.dependencies = new HashSet<TableRef>();
             OnNodeVisitor visitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable);
             onNode.accept(visitor);
@@ -461,6 +464,10 @@ public class JoinCompiler {
             return joinTable;
         }
         
+        public boolean isSingleValueOnly() {
+            return singleValueOnly;
+        }
+        
         public Set<TableRef> getDependencies() {
             return dependencies;
         }
@@ -1177,7 +1184,7 @@ public class JoinCompiler {
                     if (lhs == lhsReplace && rhs == rhsReplace)
                         return joinNode;
 
-                    return NODE_FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode());
+                    return NODE_FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode(), joinNode.isSingleValueOnly());
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 52abb9e..a2dc5b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -225,7 +225,7 @@ public class QueryCompiler {
                 if (i < count - 1) {
                     fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
                 }
-                subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, keyRangeLhsExpression, keyRangeRhsExpression, clientProjector, hasFilters);
+                subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression, clientProjector, hasFilters);
             }
             if (needsProject) {
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector());
@@ -296,7 +296,7 @@ public class QueryCompiler {
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
             Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
             getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions);
-            return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), clientProjector, lhsJoin.hasFilters())});
+            return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), clientProjector, lhsJoin.hasFilters())});
         }
         
         // Do not support queries like "A right join B left join C" with hash-joins.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 6428155..4b37259 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -25,9 +25,12 @@ import java.util.List;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.function.DistinctValueAggregateFunction;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.ArrayAllComparisonNode;
+import org.apache.phoenix.parse.ArrayAnyComparisonNode;
 import org.apache.phoenix.parse.BooleanParseNodeVisitor;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.ComparisonParseNode;
@@ -36,6 +39,7 @@ import org.apache.phoenix.parse.ExistsParseNode;
 import org.apache.phoenix.parse.InParseNode;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.LiteralParseNode;
+import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.ParseNodeRewriter;
@@ -129,26 +133,32 @@ public class SubqueryRewriter extends ParseNodeRewriter {
 
     @Override
     public ParseNode visitLeave(InParseNode node, List<ParseNode> l) throws SQLException {
+        boolean isTopNode = topNode == node;
+        if (isTopNode) {
+            topNode = null;
+        }
+        
         SubqueryParseNode subqueryNode = (SubqueryParseNode) l.get(1);
         SelectStatement subquery = subqueryNode.getSelectNode();
         String rhsTableAlias = ParseNodeFactory.createTempAlias();
-        List<AliasedNode> selectNodes = fixAliasedNodes(subquery.getSelect());
+        List<AliasedNode> selectNodes = fixAliasedNodes(subquery.getSelect(), true);
         subquery = NODE_FACTORY.select(subquery, true, selectNodes);
         ParseNode onNode = getJoinConditionNode(l.get(0), selectNodes, rhsTableAlias);
         TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery);
-        JoinType joinType = topNode == node ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left;
-        ParseNode ret = topNode == node ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate());
-        tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode);
-        
-        if (topNode == node) {
-            topNode = null;
-        }
+        JoinType joinType = isTopNode ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left;
+        ParseNode ret = isTopNode ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate());
+        tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false);
         
         return ret;
     }
 
     @Override
     public ParseNode visitLeave(ExistsParseNode node, List<ParseNode> l) throws SQLException {
+        boolean isTopNode = topNode == node;
+        if (isTopNode) {
+            topNode = null;
+        }
+        
         SubqueryParseNode subqueryNode = (SubqueryParseNode) l.get(0);
         SelectStatement subquery = subqueryNode.getSelectNode();
         String rhsTableAlias = ParseNodeFactory.createTempAlias();
@@ -169,19 +179,20 @@ public class SubqueryRewriter extends ParseNodeRewriter {
         subquery = NODE_FACTORY.select(subquery, true, selectNodes, where);
         ParseNode onNode = conditionExtractor.getJoinCondition();
         TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery);
-        JoinType joinType = topNode == node ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left;
-        ParseNode ret = topNode == node ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate());
-        tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode);
-        
-        if (topNode == node) {
-            topNode = null;
-        }
+        JoinType joinType = isTopNode ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left;
+        ParseNode ret = isTopNode ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate());
+        tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false);
         
         return ret;
     }
 
     @Override
     public ParseNode visitLeave(ComparisonParseNode node, List<ParseNode> l) throws SQLException {
+        boolean isTopNode = topNode == node;
+        if (isTopNode) {
+            topNode = null;
+        }
+        
         ParseNode secondChild = l.get(1);
         if (!(secondChild instanceof SubqueryParseNode)) {
             return super.visitLeave(node, l);
@@ -200,12 +211,9 @@ public class SubqueryRewriter extends ParseNodeRewriter {
             return super.visitLeave(node, l);
         }
         
-        if (!subquery.isAggregate() || !subquery.getGroupBy().isEmpty()) {
-            //TODO add runtime singleton check or add a "singleton" aggregate funtion
-            throw new SQLFeatureNotSupportedException("Do not support non-aggregate or groupby subquery in comparison.");
-        }
-        
         ParseNode rhsNode = null; 
+        boolean isGroupby = !subquery.getGroupBy().isEmpty();
+        boolean isAggregate = subquery.isAggregate();
         List<AliasedNode> aliasedNodes = subquery.getSelect();
         if (aliasedNodes.size() == 1) {
             rhsNode = aliasedNodes.get(0).getNode();
@@ -221,28 +229,139 @@ public class SubqueryRewriter extends ParseNodeRewriter {
         List<AliasedNode> selectNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + 1);        
         selectNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), rhsNode));
         selectNodes.addAll(additionalSelectNodes);
-        List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size());
-        for (AliasedNode aliasedNode : additionalSelectNodes) {
-            groupbyNodes.add(aliasedNode.getNode());
+        
+        if (!isAggregate) {
+            subquery = NODE_FACTORY.select(subquery, subquery.isDistinct(), selectNodes, where);            
+        } else {
+            List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + subquery.getGroupBy().size());
+            for (AliasedNode aliasedNode : additionalSelectNodes) {
+                groupbyNodes.add(aliasedNode.getNode());
+            }
+            groupbyNodes.addAll(subquery.getGroupBy());
+            subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true);
         }
         
-        subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true);
         ParseNode onNode = conditionExtractor.getJoinCondition();
         TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery);
-        JoinType joinType = topNode == node ? JoinType.Inner : JoinType.Left;
+        JoinType joinType = isTopNode ? JoinType.Inner : JoinType.Left;
         ParseNode ret = NODE_FACTORY.comparison(node.getFilterOp(), l.get(0), NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null));
-        tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode);
+        tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, !isAggregate || isGroupby);
         
-        if (topNode == node) {
+        return ret;
+    }
+
+    @Override
+    public ParseNode visitLeave(ArrayAnyComparisonNode node, List<ParseNode> l) throws SQLException {
+        List<ParseNode> children = leaveArrayComparisonNode(node, l);
+        if (children == l)
+            return super.visitLeave(node, l);
+        
+        node = NODE_FACTORY.arrayAny(children.get(0), (ComparisonParseNode) children.get(1));
+        return node;
+    }
+
+    @Override
+    public ParseNode visitLeave(ArrayAllComparisonNode node, List<ParseNode> l) throws SQLException {
+        List<ParseNode> children = leaveArrayComparisonNode(node, l);
+        if (children == l)
+            return super.visitLeave(node, l);
+        
+        node = NODE_FACTORY.arrayAll(children.get(0), (ComparisonParseNode) children.get(1));
+        return node;
+    }
+    
+    protected List<ParseNode> leaveArrayComparisonNode(ParseNode node, List<ParseNode> l) throws SQLException {
+        boolean isTopNode = topNode == node;
+        if (isTopNode) {
             topNode = null;
         }
+
+        ParseNode firstChild = l.get(0);
+        if (!(firstChild instanceof SubqueryParseNode)) {
+            return l;
+        }
         
-        return ret;
+        SubqueryParseNode subqueryNode = (SubqueryParseNode) firstChild;
+        SelectStatement subquery = subqueryNode.getSelectNode();
+        String rhsTableAlias = ParseNodeFactory.createTempAlias();
+        JoinConditionExtractor conditionExtractor = new JoinConditionExtractor(subquery, resolver, connection, rhsTableAlias);
+        ParseNode where = subquery.getWhere() == null ? null : subquery.getWhere().accept(conditionExtractor);
+        if (where == subquery.getWhere()) { // non-correlated any/all comparison subquery
+            return l;
+        }
+        
+        ParseNode rhsNode = null; 
+        boolean isNonGroupByAggregate = subquery.getGroupBy().isEmpty() && subquery.isAggregate();
+        List<AliasedNode> aliasedNodes = subquery.getSelect();
+        String derivedTableAlias = null;
+        if (!subquery.getGroupBy().isEmpty()) {
+            derivedTableAlias = ParseNodeFactory.createTempAlias();
+            aliasedNodes = fixAliasedNodes(aliasedNodes, false);
+        }
+        
+        if (aliasedNodes.size() == 1) {
+            rhsNode = derivedTableAlias == null ? aliasedNodes.get(0).getNode() : NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), aliasedNodes.get(0).getAlias(), null);
+        } else {
+            List<ParseNode> nodes = Lists.<ParseNode> newArrayListWithExpectedSize(aliasedNodes.size());
+            for (AliasedNode aliasedNode : aliasedNodes) {
+                nodes.add(derivedTableAlias == null ? aliasedNode.getNode() : NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), aliasedNode.getAlias(), null));
+            }
+            rhsNode = NODE_FACTORY.rowValueConstructor(nodes);
+        }
+        
+        if (!isNonGroupByAggregate) {
+            rhsNode = NODE_FACTORY.function(DistinctValueAggregateFunction.NAME, Collections.singletonList(rhsNode));
+        }
+        
+        List<AliasedNode> additionalSelectNodes = conditionExtractor.getAdditionalSelectNodes();
+        List<AliasedNode> selectNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + 1);        
+        selectNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), rhsNode));
+        selectNodes.addAll(additionalSelectNodes);
+        List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size());
+        for (AliasedNode aliasedNode : additionalSelectNodes) {
+            groupbyNodes.add(aliasedNode.getNode());
+        }
+        
+        if (derivedTableAlias == null) {
+            subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true);
+        } else {
+            List<ParseNode> derivedTableGroupBy = Lists.newArrayListWithExpectedSize(subquery.getGroupBy().size() + groupbyNodes.size());
+            derivedTableGroupBy.addAll(subquery.getGroupBy());
+            derivedTableGroupBy.addAll(groupbyNodes);
+            List<AliasedNode> derivedTableSelect = Lists.newArrayListWithExpectedSize(aliasedNodes.size() + selectNodes.size() - 1);
+            derivedTableSelect.addAll(aliasedNodes);
+            for (int i = 1; i < selectNodes.size(); i++) {
+                AliasedNode aliasedNode = selectNodes.get(i);
+                String alias = ParseNodeFactory.createTempAlias();
+                derivedTableSelect.add(NODE_FACTORY.aliasedNode(alias, aliasedNode.getNode()));
+                aliasedNode = NODE_FACTORY.aliasedNode(aliasedNode.getAlias(), NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), alias, null));
+                selectNodes.set(i, aliasedNode);
+                groupbyNodes.set(i - 1, aliasedNode.getNode());
+            }
+            SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, derivedTableSelect, where, derivedTableGroupBy, true);
+            subquery = NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt)), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, subquery.hasSequence());
+        }
+        
+        ParseNode onNode = conditionExtractor.getJoinCondition();
+        TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery);
+        JoinType joinType = isTopNode ? JoinType.Inner : JoinType.Left;
+        tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false);
+
+        firstChild = NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null);
+        if (isNonGroupByAggregate) {
+            firstChild = NODE_FACTORY.upsertStmtArrayNode(Collections.singletonList(firstChild)); 
+        }
+        ComparisonParseNode secondChild = (ComparisonParseNode) l.get(1);
+        secondChild = NODE_FACTORY.comparison(secondChild.getFilterOp(), secondChild.getLHS(), NODE_FACTORY.elementRef(Lists.newArrayList(firstChild, NODE_FACTORY.literal(1))));
+        
+        return Lists.newArrayList(firstChild, secondChild);
     }
     
-    private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes) {
-        List<AliasedNode> normNodes = Lists.<AliasedNode> newArrayListWithExpectedSize(nodes.size() + 1);
-        normNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), LiteralParseNode.ONE));
+    private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean addSelectOne) {
+        List<AliasedNode> normNodes = Lists.<AliasedNode> newArrayListWithExpectedSize(nodes.size() + (addSelectOne ? 1 : 0));
+        if (addSelectOne) {
+            normNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), LiteralParseNode.ONE));
+        }
         for (int i = 0; i < nodes.size(); i++) {
             AliasedNode aliasedNode = nodes.get(i);
             normNodes.add(NODE_FACTORY.aliasedNode(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 02fc6e3..8e0d42d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -185,6 +185,7 @@ public class HashJoinRegionScanner implements RegionScanner {
             if (postFilter != null) {
                 for (Iterator<Tuple> iter = resultQueue.iterator(); iter.hasNext();) {
                     Tuple t = iter.next();
+                    postFilter.reset();
                     ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
                     try {
                         if (!postFilter.evaluate(t, tempPtr)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 1ac9d68..bb3940c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -395,6 +395,7 @@ public class HashJoinPlan implements QueryPlan {
         private final int index;
         private final QueryPlan plan;
         private final List<Expression> hashExpressions;
+        private final boolean singleValueOnly;
         private final Expression keyRangeLhsExpression;
         private final Expression keyRangeRhsExpression;
         private final TupleProjector clientProjector;
@@ -402,12 +403,14 @@ public class HashJoinPlan implements QueryPlan {
         
         public HashSubPlan(int index, QueryPlan subPlan, 
                 List<Expression> hashExpressions,
+                boolean singleValueOnly,
                 Expression keyRangeLhsExpression, 
                 Expression keyRangeRhsExpression, 
                 TupleProjector clientProjector, boolean hasFilters) {
             this.index = index;
             this.plan = subPlan;
             this.hashExpressions = hashExpressions;
+            this.singleValueOnly = singleValueOnly;
             this.keyRangeLhsExpression = keyRangeLhsExpression;
             this.keyRangeRhsExpression = keyRangeRhsExpression;
             this.clientProjector = clientProjector;
@@ -424,7 +427,7 @@ public class HashJoinPlan implements QueryPlan {
             ServerCache cache = null;
             if (hashExpressions != null) {
                 cache = parent.hashClient.addHashCache(ranges, plan.iterator(), 
-                        clientProjector, plan.getEstimatedSize(), hashExpressions, parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
+                        clientProjector, plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
                 long endTime = System.currentTimeMillis();
                 boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
                 if (!isSet && (endTime - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
index 85eb735..dd23534 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
@@ -37,23 +37,25 @@ public class ArrayConstructorExpression extends BaseCompoundExpression {
     // store the offset postion in this.  Later based on the total size move this to a byte[]
     // and serialize into byte stream
     private int[] offsetPos;
+    
+    public ArrayConstructorExpression() {
+    }
 
     public ArrayConstructorExpression(List<Expression> children, PDataType baseType) {
         super(children);
         init(baseType);
+    }
+
+    private void init(PDataType baseType) {
+        this.baseType = baseType;
+        elements = new Object[getChildren().size()];
         estimatedSize = PArrayDataType.estimateSize(this.children.size(), this.baseType);
         if (!this.baseType.isFixedWidth()) {
             offsetPos = new int[children.size()];
             byteStream = new TrustedByteArrayOutputStream(estimatedSize);
         } else {
             byteStream = new TrustedByteArrayOutputStream(estimatedSize);
-        }
-            
-    }
-
-    private void init(PDataType baseType) {
-        this.baseType = baseType;
-        elements = new Object[getChildren().size()];
+        }            
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index 1566d64..3219cfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.function.ConvertTimezoneFunction;
 import org.apache.phoenix.expression.function.CountAggregateFunction;
 import org.apache.phoenix.expression.function.DecodeFunction;
 import org.apache.phoenix.expression.function.DistinctCountAggregateFunction;
+import org.apache.phoenix.expression.function.DistinctValueAggregateFunction;
 import org.apache.phoenix.expression.function.EncodeFunction;
 import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
 import org.apache.phoenix.expression.function.FirstValueFunction;
@@ -148,6 +149,7 @@ public enum ExpressionType {
     LowerFunction(LowerFunction.class),
     TrimFunction(TrimFunction.class),
     DistinctCountAggregateFunction(DistinctCountAggregateFunction.class),
+    DistinctValueAggregateFunction(DistinctValueAggregateFunction.class),
     PercentileContAggregateFunction(PercentileContAggregateFunction.class),
     PercentRankAggregateFunction(PercentRankAggregateFunction.class),
     StddevPopFunction(StddevPopFunction.class),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
new file mode 100644
index 0000000..fefb077
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.PArrayDataType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PhoenixArray;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class DistinctValueClientAggregator extends DistinctValueWithCountClientAggregator {
+    private final PDataType valueType;
+    private final PDataType resultType;
+    
+    public DistinctValueClientAggregator(SortOrder sortOrder, PDataType valueType, PDataType resultType) {
+        super(sortOrder);
+        this.valueType = valueType;
+        this.resultType = resultType;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null || buffer.length == 0) {            
+            Object[] values = new Object[valueVsCount.size()];
+            int i = 0;
+            for (ImmutableBytesPtr key : valueVsCount.keySet()) {
+                values[i++] = valueType.toObject(key, sortOrder);
+            }
+            PhoenixArray array = PArrayDataType.instantiatePhoenixArray(valueType, values);
+            buffer = resultType.toBytes(array, sortOrder);
+        }
+        ptr.set(buffer);
+        return true;
+    }
+
+    @Override
+    protected PDataType getResultDataType() {
+        return resultType;
+    }
+    
+    @Override
+    protected int getBufferLength() {
+        return 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index 4bd2a17..281879e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -54,7 +54,7 @@ public class DistinctValueWithCountServerAggregator extends BaseAggregator {
 
     private int compressThreshold;
     private byte[] buffer = null;
-    private Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+    protected Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
 
     public DistinctValueWithCountServerAggregator(Configuration conf) {
         super(SortOrder.getDefault());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java
new file mode 100644
index 0000000..6877409
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.DistinctValueClientAggregator;
+import org.apache.phoenix.expression.aggregator.DistinctValueWithCountClientAggregator;
+import org.apache.phoenix.expression.aggregator.DistinctValueWithCountServerAggregator;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PDataType;
+
+@BuiltInFunction(name=DistinctValueAggregateFunction.NAME, args= {@Argument()} )
+public class DistinctValueAggregateFunction extends DistinctValueWithCountAggregateFunction {
+    public static final String NAME = "COLLECTDISTINCT";
+    
+    public DistinctValueAggregateFunction() {
+    }
+    
+    public DistinctValueAggregateFunction(List<Expression> children) {
+        super(children);
+    }
+
+    @Override
+    public Aggregator newServerAggregator(Configuration conf) {
+        return new DistinctValueWithCountServerAggregator(conf);
+    }
+
+    @Override
+    public DistinctValueWithCountClientAggregator newClientAggregator() {
+        PDataType baseType = getAggregatorExpression().getDataType().isArrayType() ? PDataType.VARBINARY : getAggregatorExpression().getDataType();
+        PDataType resultType = PDataType.fromTypeId(baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE);
+        return new DistinctValueClientAggregator(getAggregatorExpression().getSortOrder(), baseType, resultType);
+    }
+    
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        PDataType baseType = getAggregatorExpression().getDataType().isArrayType() ? PDataType.VARBINARY : getAggregatorExpression().getDataType();
+        return PDataType.fromTypeId(baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 863b535..b6245ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -69,16 +69,16 @@ public class HashCacheClient  {
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
+    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        serialize(ptr, iterator, projector, estimatedSize, onExpressions, keyRangeRhsExpression, keyRangeRhsValues);
+        serialize(ptr, iterator, projector, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
         return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
     }
     
-    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
+    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
         long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
         estimatedSize = Math.min(estimatedSize, maxSize);
         if (estimatedSize > Integer.MAX_VALUE) {
@@ -93,6 +93,7 @@ public class HashCacheClient  {
                 WritableUtils.writeVInt(out, ExpressionType.valueOf(expression).ordinal());
                 expression.write(out);                
             }
+            out.writeBoolean(singleValueOnly);
             int exprSize = baOut.size() + Bytes.SIZEOF_INT;
             out.writeInt(exprSize);
             int nRows = 0;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 257fa1f..697d862 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -27,9 +27,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.xerial.snappy.Snappy;
-
 import org.apache.phoenix.cache.HashCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -67,6 +68,7 @@ public class HashCacheFactory implements ServerCacheFactory {
     private class HashCacheImpl implements HashCache {
         private final Map<ImmutableBytesPtr,List<Tuple>> hashCache;
         private final MemoryChunk memoryChunk;
+        private final boolean singleValueOnly;
         
         private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk) {
             try {
@@ -83,6 +85,7 @@ public class HashCacheFactory implements ServerCacheFactory {
                     expression.readFields(dataInput);
                     onExpressions.add(expression);                        
                 }
+                this.singleValueOnly = dataInput.readBoolean();
                 int exprSize = dataInput.readInt();
                 offset += exprSize;
                 int nRows = dataInput.readInt();
@@ -117,8 +120,14 @@ public class HashCacheFactory implements ServerCacheFactory {
         }
         
         @Override
-        public List<Tuple> get(ImmutableBytesPtr hashKey) {
-            return hashCache.get(hashKey);
+        public List<Tuple> get(ImmutableBytesPtr hashKey) throws IOException {
+            List<Tuple> ret = hashCache.get(hashKey);
+            if (singleValueOnly && ret != null && ret.size() > 1) {
+                SQLException ex = new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
+                ServerUtil.throwIOException(ex.getMessage(), ex);
+            }
+            
+            return ret;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java
index a51ca5c..5dd13f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java
@@ -43,13 +43,15 @@ public class JoinTableNode extends TableNode {
     private final TableNode lhs;
     private final TableNode rhs;
     private final ParseNode onNode;
+    private final boolean singleValueOnly;
     
-    JoinTableNode(JoinType type, TableNode lhs, TableNode rhs, ParseNode onNode) {
+    JoinTableNode(JoinType type, TableNode lhs, TableNode rhs, ParseNode onNode, boolean singleValueOnly) {
         super(null);
         this.type = type;
         this.lhs = lhs;
         this.rhs = rhs;
         this.onNode = onNode;
+        this.singleValueOnly = singleValueOnly;
     }
     
     public JoinType getType() {
@@ -67,6 +69,10 @@ public class JoinTableNode extends TableNode {
     public ParseNode getOnNode() {
         return onNode;
     }
+    
+    public boolean isSingleValueOnly() {
+        return singleValueOnly;
+    }
 
     @Override
     public <T> T accept(TableNodeVisitor<T> visitor) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 85fd978..4f5e7ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -417,7 +417,7 @@ public class ParseNodeFactory {
 
     public TableNode table(TableNode table, List<JoinPartNode> parts) {
         for (JoinPartNode part : parts) {
-            table = new JoinTableNode(part.getType(), table, part.getTable(), part.getOnNode());
+            table = new JoinTableNode(part.getType(), table, part.getTable(), part.getOnNode(), false);
         }
         return table;
     }
@@ -426,8 +426,8 @@ public class ParseNodeFactory {
         return new JoinPartNode(type, onNode, table);
     }
 
-    public JoinTableNode join(JoinType type, TableNode lhs, TableNode rhs, ParseNode on) {
-        return new JoinTableNode(type, lhs, rhs, on);
+    public JoinTableNode join(JoinType type, TableNode lhs, TableNode rhs, ParseNode on, boolean singleValueOnly) {
+        return new JoinTableNode(type, lhs, rhs, on, singleValueOnly);
     }
 
     public DerivedTableNode derivedTable (String alias, SelectStatement select) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index 06ac1c6..338a45b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -561,7 +561,7 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
             if (lhsNode == normLhsNode && rhsNode == normRhsNode && onNode == normOnNode)
                 return joinNode;
 
-            return NODE_FACTORY.join(joinNode.getType(), normLhsNode, normRhsNode, normOnNode);
+            return NODE_FACTORY.join(joinNode.getType(), normLhsNode, normRhsNode, normOnNode, joinNode.isSingleValueOnly());
         }
 
         @Override