You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2018/11/06 23:14:23 UTC

[2/6] phoenix git commit: PHOENIX-4981 Add tests for ORDER BY, GROUP BY and salted tables using phoenix-spark

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
index c9168f1..69c9869 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
@@ -37,104 +37,18 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Test;
 
-
 /**
  * Tests for table with transparent salting.
  */
 
-public class SaltedTableIT extends ParallelStatsDisabledIT {
-
-	private static String getUniqueTableName() {
-		return SchemaUtil.getTableName(generateUniqueName(), generateUniqueName());
-	}
-	
-    private static String initTableValues(byte[][] splits) throws Exception {
-    	String tableName = getUniqueTableName();    	
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        
-        // Rows we inserted:
-        // 1ab123abc111
-        // 1abc456abc111
-        // 1de123abc111
-        // 2abc123def222 
-        // 3abc123ghi333
-        // 4abc123jkl444
-        try {
-            // Upsert with no column specifies.
-            ensureTableCreated(getUrl(), tableName, TABLE_WITH_SALTING, splits, null, null);
-            String query = "UPSERT INTO " + tableName + " VALUES(?,?,?,?,?)";
-            PreparedStatement stmt = conn.prepareStatement(query);
-            stmt.setInt(1, 1);
-            stmt.setString(2, "ab");
-            stmt.setString(3, "123");
-            stmt.setString(4, "abc");
-            stmt.setInt(5, 111);
-            stmt.execute();
-            conn.commit();
-            
-            stmt.setInt(1, 1);
-            stmt.setString(2, "abc");
-            stmt.setString(3, "456");
-            stmt.setString(4, "abc");
-            stmt.setInt(5, 111);
-            stmt.execute();
-            conn.commit();
-            
-            // Test upsert when statement explicitly specifies the columns to upsert into.
-            query = "UPSERT INTO " + tableName +
-                    " (a_integer, a_string, a_id, b_string, b_integer) " + 
-                    " VALUES(?,?,?,?,?)";
-            stmt = conn.prepareStatement(query);
-            
-            stmt.setInt(1, 1);
-            stmt.setString(2, "de");
-            stmt.setString(3, "123");
-            stmt.setString(4, "abc");
-            stmt.setInt(5, 111);
-            stmt.execute();
-            conn.commit();
-            
-            stmt.setInt(1, 2);
-            stmt.setString(2, "abc");
-            stmt.setString(3, "123");
-            stmt.setString(4, "def");
-            stmt.setInt(5, 222);
-            stmt.execute();
-            conn.commit();
-            
-            // Test upsert when order of column is shuffled.
-            query = "UPSERT INTO " + tableName +
-                    " (a_string, a_integer, a_id, b_string, b_integer) " + 
-                    " VALUES(?,?,?,?,?)";
-            stmt = conn.prepareStatement(query);
-            stmt.setString(1, "abc");
-            stmt.setInt(2, 3);
-            stmt.setString(3, "123");
-            stmt.setString(4, "ghi");
-            stmt.setInt(5, 333);
-            stmt.execute();
-            conn.commit();
-            
-            stmt.setString(1, "abc");
-            stmt.setInt(2, 4);
-            stmt.setString(3, "123");
-            stmt.setString(4, "jkl");
-            stmt.setInt(5, 444);
-            stmt.execute();
-            conn.commit();
-        } finally {
-            conn.close();
-        }
-        return tableName;
-    }
+public class SaltedTableIT extends BaseSaltedTableIT {
 
     @Test
     public void testTableWithInvalidBucketNumber() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
-            String query = "create table " + getUniqueTableName() + " (a_integer integer not null CONSTRAINT pk PRIMARY KEY (a_integer)) SALT_BUCKETS = 257";
+            String query = "create table " + generateUniqueName() + " (a_integer integer not null CONSTRAINT pk PRIMARY KEY (a_integer)) SALT_BUCKETS = 257";
             PreparedStatement stmt = conn.prepareStatement(query);
             stmt.execute();
             fail("Should have caught exception");
@@ -148,370 +62,12 @@ public class SaltedTableIT extends ParallelStatsDisabledIT {
     @Test
     public void testTableWithSplit() throws Exception {
         try {
-            createTestTable(getUrl(), "create table " + getUniqueTableName() + " (a_integer integer not null primary key) SALT_BUCKETS = 4",
+            createTestTable(getUrl(), "create table " + generateUniqueName() + " (a_integer integer not null primary key) SALT_BUCKETS = 4",
                     new byte[][] {{1}, {2,3}, {2,5}, {3}}, null);
             fail("Should have caught exception");
         } catch (SQLException e) {
             assertTrue(e.getMessage(), e.getMessage().contains("ERROR 1022 (42Y81): Should not specify split points on salted table with default row key order."));
         }
     }
-    
-    @Test
-    public void testSelectValueNoWhereClause() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
-            String tableName = initTableValues(null);
-            
-            String query = "SELECT * FROM " + tableName;
-            PreparedStatement statement = conn.prepareStatement(query);
-            ResultSet rs = statement.executeQuery();
-            
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("ab", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("456", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("de", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("def", rs.getString(4));
-            assertEquals(222, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("ghi", rs.getString(4));
-            assertEquals(333, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(4, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("jkl", rs.getString(4));
-            assertEquals(444, rs.getInt(5));
-            
-            assertFalse(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
-
-    @Test
-    public void testSelectValueWithFullyQualifiedWhereClause() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
-            String tableName = initTableValues(null);
-            String query;
-            PreparedStatement stmt;
-            ResultSet rs;
-            
-            // Variable length slot with bounded ranges.
-            query = "SELECT * FROM " + tableName + 
-                    " WHERE a_integer = 1 AND a_string >= 'ab' AND a_string < 'de' AND a_id = '123'";
-            stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("ab", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            assertFalse(rs.next());
-
-            // all single slots with one value.
-            query = "SELECT * FROM " + tableName + 
-                    " WHERE a_integer = 1 AND a_string = 'ab' AND a_id = '123'";
-            stmt = conn.prepareStatement(query);
-            
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("ab", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            assertFalse(rs.next());
-            
-            // all single slots with multiple values.
-            query = "SELECT * FROM " + tableName + 
-                    " WHERE a_integer in (2, 4) AND a_string = 'abc' AND a_id = '123'";
-            stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
-            
-            assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("def", rs.getString(4));
-            assertEquals(222, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(4, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("jkl", rs.getString(4));
-            assertEquals(444, rs.getInt(5));
-            assertFalse(rs.next());
-            
-            query = "SELECT a_integer, a_string FROM " + tableName +
-                    " WHERE a_integer in (1,2,3,4) AND a_string in ('a', 'abc', 'de') AND a_id = '123'";
-            stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
-            
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("de", rs.getString(2));
-            
-            assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            
-            assertTrue(rs.next());
-            assertEquals(4, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertFalse(rs.next());
-            
-            // fixed length slot with bounded ranges.
-            query = "SELECT a_string, a_id FROM " + tableName + 
-                    " WHERE a_integer > 1 AND a_integer < 4 AND a_string = 'abc' AND a_id = '123'";
-            stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals("abc", rs.getString(1));
-            assertEquals("123", rs.getString(2));
-            
-            assertTrue(rs.next());
-            assertEquals("abc", rs.getString(1));
-            assertEquals("123", rs.getString(2));
-            assertFalse(rs.next());
-            
-            // fixed length slot with unbound ranges.
-            query = "SELECT b_string, b_integer FROM " + tableName + 
-                    " WHERE a_integer > 1 AND a_string = 'abc' AND a_id = '123'";
-            stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals("def", rs.getString(1));
-            assertEquals(222, rs.getInt(2));
-            
-            assertTrue(rs.next());
-            assertEquals("ghi", rs.getString(1));
-            assertEquals(333, rs.getInt(2));
-            
-            assertTrue(rs.next());
-            assertEquals("jkl", rs.getString(1));
-            assertEquals(444, rs.getInt(2));
-            assertFalse(rs.next());
-            
-            // Variable length slot with unbounded ranges.
-            query = "SELECT * FROM " + tableName + 
-                    " WHERE a_integer = 1 AND a_string > 'ab' AND a_id = '123'";
-            stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("de", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            assertFalse(rs.next());
-
-        } finally {
-            conn.close();
-        }
-    }
-
-    @Test
-    public void testSelectValueWithNotFullyQualifiedWhereClause() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
-            String tableName = initTableValues(null);
-            
-            // Where without fully qualified key, point query.
-            String query = "SELECT * FROM " + tableName + " WHERE a_integer = ? AND a_string = ?";
-            PreparedStatement stmt = conn.prepareStatement(query);
-            
-            stmt.setInt(1, 1);
-            stmt.setString(2, "abc");
-            ResultSet rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("456", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            assertFalse(rs.next());
-            
-            // Where without fully qualified key, range query.
-            query = "SELECT * FROM " + tableName + " WHERE a_integer >= 2";
-            stmt = conn.prepareStatement(query);
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("def", rs.getString(4));
-            assertEquals(222, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("ghi", rs.getString(4));
-            assertEquals(333, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(4, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("jkl", rs.getString(4));
-            assertEquals(444, rs.getInt(5));
-            assertFalse(rs.next());
-            
-            // With point query.
-            query = "SELECT a_string FROM " + tableName + " WHERE a_string = ?";
-            stmt = conn.prepareStatement(query);
-            stmt.setString(1, "de");
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals("de", rs.getString(1));
-            assertFalse(rs.next());
-            
-            query = "SELECT a_id FROM " + tableName + " WHERE a_id = ?";
-            stmt = conn.prepareStatement(query);
-            stmt.setString(1, "456");
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals("456", rs.getString(1));
-            assertFalse(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
-
-    @Test
-    public void testSelectWithGroupBy() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
-            String tableName = initTableValues(null);
-            
-            String query = "SELECT a_integer FROM " + tableName + " GROUP BY a_integer";
-            PreparedStatement stmt = conn.prepareStatement(query);
-            ResultSet rs = stmt.executeQuery();
-            int count = 0;
-            while (rs.next()) {
-                count++;
-            }
-            assertEquals("Group by does not return the right count.", count, 4);
-        } finally {
-            conn.close();
-        }
-    }
 
-    @Test
-    public void testLimitScan() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
-            String tableName = initTableValues(null);
-            
-            String query = "SELECT a_integer FROM " + tableName + " WHERE a_string='abc' LIMIT 1";
-            PreparedStatement stmt = conn.prepareStatement(query);
-            ResultSet rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertFalse(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
-    
-    @Test
-    public void testSelectWithOrderByRowKey() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
-            String tableName = initTableValues(null);
-            
-            String query = "SELECT * FROM " + tableName + " ORDER  BY  a_integer, a_string, a_id";
-            PreparedStatement statement = conn.prepareStatement(query);
-            ResultSet explainPlan = statement.executeQuery("EXPLAIN " + query);
-            // Confirm that ORDER BY in row key order will be optimized out for salted table
-            assertEquals("CLIENT PARALLEL 4-WAY FULL SCAN OVER " + tableName + "\n" + 
-                    "CLIENT MERGE SORT", QueryUtil.getExplainPlan(explainPlan));
-            ResultSet rs = statement.executeQuery();
-            
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("ab", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("456", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertEquals("de", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("abc", rs.getString(4));
-            assertEquals(111, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("def", rs.getString(4));
-            assertEquals(222, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("ghi", rs.getString(4));
-            assertEquals(333, rs.getInt(5));
-            
-            assertTrue(rs.next());
-            assertEquals(4, rs.getInt(1));
-            assertEquals("abc", rs.getString(2));
-            assertEquals("123", rs.getString(3));
-            assertEquals("jkl", rs.getString(4));
-            assertEquals(444, rs.getInt(5));
-            
-            assertFalse(rs.next());
-        } finally {
-            conn.close();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java
new file mode 100644
index 0000000..afce0dd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.phoenix.parse.HintNode;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.phoenix.util.SchemaUtil.getEscapedFullColumnName;
+
+public class QueryBuilder {
+
+    private String fullTableName;
+    // regular columns that are in the select clause
+    private List<String> selectColumns = Collections.emptyList();
+
+    // columns that are required for expressions in the select clause
+    private List<String> selectExpressionColumns  = Collections.emptyList();
+    // expression string in the select clause (for eg COL1 || COL2)
+    private String selectExpression;
+    private String whereClause;
+    private String orderByClause;
+    private String groupByClause;
+    private String havingClause;
+    private HintNode.Hint hint;
+    private boolean escapeCols;
+    private boolean distinct;
+    private int limit;
+
+    public String getFullTableName() {
+        return fullTableName;
+    }
+
+    /**
+     * @return column names required to evaluate this select statement
+     */
+    public List<String> getRequiredColumns() {
+        List<String> allColumns = Lists.newArrayList(selectColumns);
+        if (!CollectionUtils.isEmpty(selectExpressionColumns)) {
+            allColumns.addAll(selectExpressionColumns);
+        }
+        return allColumns;
+    }
+
+    public String getWhereClause() {
+        return whereClause;
+    }
+
+    public HintNode.Hint getHint() {
+        return hint;
+    }
+
+    public String getOrderByClause() {
+        return orderByClause;
+    }
+
+    public String getGroupByClause() {
+        return groupByClause;
+    }
+
+    public QueryBuilder setOrderByClause(String orderByClause) {
+        this.orderByClause = orderByClause;
+        return this;
+    }
+
+    public QueryBuilder setFullTableName(String fullTableName) {
+        this.fullTableName = fullTableName;
+        return this;
+    }
+
+    public QueryBuilder setSelectColumns(List<String> columns) {
+        this.selectColumns = columns;
+        return this;
+    }
+
+    public QueryBuilder setWhereClause(String whereClause) {
+        this.whereClause = whereClause;
+        return this;
+    }
+
+    public QueryBuilder setHint(HintNode.Hint hint) {
+        this.hint = hint;
+        return this;
+    }
+
+    public QueryBuilder setEscapeCols(boolean escapeCols) {
+        this.escapeCols = escapeCols;
+        return this;
+    }
+
+    public QueryBuilder setGroupByClause(String groupByClause) {
+        this.groupByClause = groupByClause;
+        return this;
+    }
+
+    public QueryBuilder setHavingClause(String havingClause) {
+        this.havingClause = havingClause;
+        return this;
+    }
+
+    public List<String> getSelectExpressionColumns() {
+        return selectExpressionColumns;
+    }
+
+    public QueryBuilder setSelectExpressionColumns(List<String> selectExpressionColumns) {
+        this.selectExpressionColumns = selectExpressionColumns;
+        return this;
+    }
+
+    public String getSelectExpression() {
+        return selectExpression;
+    }
+
+    public QueryBuilder setSelectExpression(String selectExpression) {
+        this.selectExpression = selectExpression;
+        return this;
+    }
+
+    public QueryBuilder setDistinct(boolean distinct) {
+        this.distinct = distinct;
+        return this;
+    }
+
+    public QueryBuilder setLimit(int limit) {
+        this.limit = limit;
+        return this;
+    }
+
+    public String build() {
+        Preconditions.checkNotNull(fullTableName, "Table name cannot be null");
+        if (CollectionUtils.isEmpty(selectColumns) && StringUtils.isBlank(selectExpression)) {
+            throw new IllegalArgumentException("At least one column or select expression must be provided");
+        }
+        StringBuilder query = new StringBuilder();
+        query.append("SELECT ");
+
+        if (distinct) {
+            query.append(" DISTINCT ");
+        }
+
+        if (hint != null) {
+            final HintNode node = new HintNode(hint.name());
+            String hintStr = node.toString();
+            query.append(hintStr);
+        }
+
+        StringBuilder selectClauseBuilder = new StringBuilder();
+        if (StringUtils.isNotBlank(selectExpression)) {
+            if (selectClauseBuilder.length()!=0) {
+                selectClauseBuilder.append(" , ");
+            }
+            selectClauseBuilder.append(selectExpression);
+        }
+
+        boolean first = true;
+        for (String col : selectColumns) {
+            if (StringUtils.isNotBlank(col)) {
+                if ((first && selectClauseBuilder.length()!=0) || !first) {
+                    selectClauseBuilder.append(" , ");
+                }
+                String fullColumnName = col;
+                if (escapeCols) {
+                    fullColumnName = getEscapedFullColumnName(col);
+                }
+                selectClauseBuilder.append(fullColumnName);
+                first = false;
+            }
+        }
+
+        query.append(selectClauseBuilder);
+        query.append(" FROM ");
+        query.append(fullTableName);
+        if (StringUtils.isNotBlank(whereClause)) {
+            query.append(" WHERE (").append(whereClause).append(")");
+        }
+        if (StringUtils.isNotBlank(groupByClause)) {
+            query.append(" GROUP BY ").append(groupByClause);
+        }
+        if (StringUtils.isNotBlank(havingClause)) {
+            query.append(" HAVING ").append(havingClause);
+        }
+        if (StringUtils.isNotBlank(orderByClause)) {
+            query.append(" ORDER BY ").append(orderByClause);
+        }
+        if (limit > 0) {
+            query.append(" LIMIT ").append(limit);
+        }
+        return query.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 94cbfea..4501158 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -217,45 +217,15 @@ public final class QueryUtil {
      *
      * @param fullTableName name of the table for which the select statement needs to be created.
      * @param columns list of columns to be projected in the select statement.
-     * @param conditions The condition clause to be added to the WHERE condition
+     * @param whereClause The condition clause to be added to the WHERE condition
      * @param hint hint to use
      * @param escapeCols whether to escape the projected columns
      * @return Select Query
      */
     public static String constructSelectStatement(String fullTableName, List<String> columns,
-            final String conditions, Hint hint, boolean escapeCols) {
-        Preconditions.checkNotNull(fullTableName, "Table name cannot be null");
-        if (columns == null || columns.isEmpty()) {
-            throw new IllegalArgumentException("At least one column must be provided");
-        }
-        StringBuilder query = new StringBuilder();
-        query.append("SELECT ");
-
-        String hintStr = "";
-        if (hint != null) {
-            final HintNode node = new HintNode(hint.name());
-            hintStr = node.toString();
-        }
-        query.append(hintStr);
-
-        for (String col : columns) {
-            if (col != null) {
-                String fullColumnName = col;
-                if (escapeCols) {
-                    fullColumnName = getEscapedFullColumnName(col);
-                }
-                query.append(fullColumnName);
-                query.append(",");
-            }
-        }
-        // Remove the trailing comma
-        query.setLength(query.length() - 1);
-        query.append(" FROM ");
-        query.append(fullTableName);
-        if (conditions != null && conditions.length() > 0) {
-            query.append(" WHERE (").append(conditions).append(")");
-        }
-        return query.toString();
+            final String whereClause, Hint hint, boolean escapeCols) {
+        return new QueryBuilder().setFullTableName(fullTableName).setSelectColumns(columns)
+                .setWhereClause(whereClause).setHint(hint).setEscapeCols(escapeCols).build();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
index c6bb739..a904bca 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
@@ -47,7 +47,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest {
         String sqlStr =
                 IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, columnNames,
                     SCRUTINY_TIME_MILLIS);
-        assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193))",
+        assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193))",
             sqlStr);
     }
 
@@ -58,7 +58,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest {
         String query =
                 IndexScrutinyTableOutput.getSqlQueryMissingTargetRows(conn, columnNames,
                     SCRUTINY_TIME_MILLIS);
-        assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,false))",
+        assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,false))",
             query);
     }
 
@@ -69,7 +69,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest {
         String query =
                 IndexScrutinyTableOutput.getSqlQueryBadCoveredColVal(conn, columnNames,
                     SCRUTINY_TIME_MILLIS);
-        assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,true))",
+        assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,true))",
             query);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index f864dd5..0c4c004 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -145,7 +145,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
             configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
             PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
-            final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + tableName ; 
+            final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + tableName ;
             assertEquals(expectedSelectStatement, selectStatement);
         } finally {
             conn.close();
@@ -167,7 +167,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
             configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
             PhoenixConfigurationUtil.setInputTableName(configuration, fullTableName);
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
-            final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + fullTableName; 
+            final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + fullTableName;
             assertEquals(expectedSelectStatement, selectStatement);
         } finally {
             conn.close();
@@ -209,7 +209,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
             PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
             PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
-            final String expectedSelectStatement = "SELECT \"ID\",\"0\".\"VCARRAY\" FROM " + tableName ; 
+            final String expectedSelectStatement = "SELECT \"ID\" , \"0\".\"VCARRAY\" FROM " + tableName ;
             assertEquals(expectedSelectStatement, selectStatement);
         } finally {
             conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 2d094f6..8ee8f97 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -66,14 +66,14 @@ public class QueryUtilTest {
     @Test
     public void testConstructSelectStatement() {
         assertEquals(
-                "SELECT \"ID\",\"NAME\" FROM MYTAB",
+                "SELECT \"ID\" , \"NAME\" FROM MYTAB",
                 QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
     }
 
     @Test
     public void testConstructSelectStatementWithSchema() {
         assertEquals(
-                "SELECT \"ID\",\"NAME\" FROM A.MYTAB",
+                "SELECT \"ID\" , \"NAME\" FROM A.MYTAB",
                 QueryUtil.constructSelectStatement("A.MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
     }
     
@@ -83,7 +83,7 @@ public class QueryUtilTest {
         final String schemaName = SchemaUtil.getEscapedArgument("a");
         final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         assertEquals(
-                "SELECT \"ID\",\"NAME\" FROM \"a\".MYTAB",
+                "SELECT \"ID\" , \"NAME\" FROM \"a\".MYTAB",
                 QueryUtil.constructSelectStatement(fullTableName, ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
     }
     
@@ -93,14 +93,14 @@ public class QueryUtilTest {
         final String schemaName = SchemaUtil.getEscapedArgument("a");
         final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         assertEquals(
-                "SELECT \"ID\",\"NAME\" FROM \"a\".\"mytab\"",
+                "SELECT \"ID\" , \"NAME\" FROM \"a\".\"mytab\"",
                 QueryUtil.constructSelectStatement(fullTableName, ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
     }
 
     @Test
     public void testConstructSelectWithHint() {
         assertEquals(
-            "SELECT /*+ NO_INDEX */ \"col1\",\"col2\" FROM MYTAB WHERE (\"col2\"=? and \"col3\" is null)",
+            "SELECT /*+ NO_INDEX */ \"col1\" , \"col2\" FROM MYTAB WHERE (\"col2\"=? and \"col3\" is null)",
             QueryUtil.constructSelectStatement("MYTAB", Lists.newArrayList("col1", "col2"),
                 "\"col2\"=? and \"col3\" is null", Hint.NO_INDEX, true));
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 7e04bc1..cbf3808 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -488,6 +488,14 @@
     <testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
     <plugins>
       <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
+      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
       </plugin>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java
new file mode 100644
index 0000000..e4b96a3
--- /dev/null
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.phoenix.end2end.BaseAggregateIT;
+import org.apache.phoenix.util.QueryBuilder;
+
+public class AggregateIT extends BaseAggregateIT {
+
+    @Override
+    protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
+        String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
+        ResultSet rs = null;
+        try {
+            rs = executeQuery(conn, queryBuilder);
+            fail();
+        }
+        catch(Exception e) {
+            assertTrue(e.getMessage().contains(expectedSparkExceptionMsg));
+        }
+        return rs;
+    }
+
+    @Override
+    protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+        return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config);
+    }
+
+    @Override
+    protected void testCountNullInNonEmptyKeyValueCF(int columnEncodedBytes) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            //Type is INT
+            String intTableName=generateUniqueName();
+            String sql="create table " + intTableName + " (mykey integer not null primary key, A.COLA integer, B.COLB integer) "
+                    + "IMMUTABLE_ROWS=true, IMMUTABLE_STORAGE_SCHEME = ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = " + columnEncodedBytes + ", DISABLE_WAL=true";
+
+            conn.createStatement().execute(sql);
+            conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (1,1)");
+            conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (2,1)");
+            conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (3,1,2)");
+            conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (4,1)");
+            conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (5,1)");
+            conn.commit();
+
+            sql="select count(*) from "+intTableName;
+            QueryBuilder queryBuilder = new QueryBuilder()
+                .setSelectExpression("COUNT(*)")
+                .setFullTableName(intTableName);
+            ResultSet rs = executeQuery(conn, queryBuilder);
+            assertTrue(rs.next());
+            assertEquals(5, rs.getLong(1));
+
+            sql="select count(*) from "+intTableName + " where b.colb is not null";
+            queryBuilder.setWhereClause("`B.COLB` IS NOT NULL");
+            rs = executeQuery(conn, queryBuilder);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getLong(1));
+
+            sql="select count(*) from "+intTableName + " where b.colb is null";
+            queryBuilder.setWhereClause("`B.COLB` IS NULL");
+            rs = executeQuery(conn, queryBuilder);
+            assertTrue(rs.next());
+            assertEquals(4, rs.getLong(1));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
new file mode 100644
index 0000000..bdffaf5
--- /dev/null
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -0,0 +1,460 @@
+package org.apache.phoenix.spark;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseOrderByIT;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryBuilder;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import scala.Option;
+import scala.collection.JavaConverters;
+
+public class OrderByIT extends BaseOrderByIT {
+
+    @Override
+    protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
+                                                    String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
+        ResultSet rs = null;
+        try {
+            rs = executeQuery(conn, queryBuilder);
+            fail();
+        }
+        catch(Exception e) {
+            assertTrue(e.getMessage().contains(expectedSparkExceptionMsg));
+        }
+        return rs;
+    }
+
+    @Override
+    protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+        return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config);
+    }
+
+    @Test
+    public void testOrderByWithJoin() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String tableName1 = generateUniqueName();
+            String ddl = "CREATE TABLE " + tableName1 +
+                    "  (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer " +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+            createTestTable(getUrl(), ddl);
+            String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "a");
+            stmt.setInt(2, 40);
+            stmt.setString(3, "aa");
+            stmt.setInt(4, 10);
+            stmt.setString(5, "bb");
+            stmt.setInt(6, 20);
+            stmt.execute();
+            stmt.setString(1, "c");
+            stmt.setInt(2, 30);
+            stmt.setString(3, "cc");
+            stmt.setInt(4, 50);
+            stmt.setString(5, "dd");
+            stmt.setInt(6, 60);
+            stmt.execute();
+            stmt.setString(1, "b");
+            stmt.setInt(2, 40);
+            stmt.setString(3, "bb");
+            stmt.setInt(4, 5);
+            stmt.setString(5, "aa");
+            stmt.setInt(6, 80);
+            stmt.execute();
+            conn.commit();
+
+            String tableName2 = generateUniqueName();
+            ddl = "CREATE TABLE " + tableName2 +
+                    "  (a_string varchar not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+            createTestTable(getUrl(), ddl);
+
+            dml = "UPSERT INTO " + tableName2 + " VALUES(?, ?)";
+            stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "a");
+            stmt.setInt(2, 40);
+            stmt.execute();
+            stmt.setString(1, "b");
+            stmt.setInt(2, 20);
+            stmt.execute();
+            stmt.setString(1, "c");
+            stmt.setInt(2, 30);
+            stmt.execute();
+            conn.commit();
+
+            // create two PhoenixRDDs  using the table names and columns that are required for the JOIN query
+            List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
+            SQLContext sqlContext = SparkUtil.getSqlContext();
+            Dataset phoenixDataSet =
+                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
+                            JavaConverters.collectionAsScalaIterableConverter(table1Columns)
+                                    .asScala().toSeq(),
+                            Option.apply((String) null), Option.apply(getUrl()), config, false,
+                            null).toDataFrame(sqlContext);
+            phoenixDataSet.registerTempTable(tableName1);
+            List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
+            phoenixDataSet =
+                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
+                            JavaConverters.collectionAsScalaIterableConverter(table2Columns)
+                                    .asScala().toSeq(),
+                            Option.apply((String) null), Option.apply(getUrl()), config, false,
+                            null).toDataFrame(sqlContext);
+            phoenixDataSet.registerTempTable(tableName2);
+
+            String query =
+                    "SELECT T1.* FROM " + tableName1 + " T1 JOIN " + tableName2
+                            + " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T1.`CF1.B`";
+            Dataset<Row> dataset =
+                    sqlContext.sql(query);
+            List<Row> rows = dataset.collectAsList();
+            ResultSet rs = new SparkResultSet(rows, dataset.columns());
+
+            assertTrue(rs.next());
+            assertEquals("a",rs.getString(1));
+            assertEquals(40,rs.getInt(2));
+            assertEquals("aa",rs.getString(3));
+            assertEquals(10,rs.getInt(4));
+            assertEquals("bb",rs.getString(5));
+            assertEquals(20,rs.getInt(6));
+            assertTrue(rs.next());
+            assertEquals("b",rs.getString(1));
+            assertEquals(40,rs.getInt(2));
+            assertEquals("bb",rs.getString(3));
+            assertEquals(5,rs.getInt(4));
+            assertEquals("aa",rs.getString(5));
+            assertEquals(80,rs.getInt(6));
+            assertTrue(rs.next());
+            assertEquals("c",rs.getString(1));
+            assertEquals(30,rs.getInt(2));
+            assertEquals("cc",rs.getString(3));
+            assertEquals(50,rs.getInt(4));
+            assertEquals("dd",rs.getString(5));
+            assertEquals(60,rs.getInt(6));
+            assertFalse(rs.next());
+
+            query =
+                    "select t1.a_string, t2.col1 from " + tableName1 + " t1 join " + tableName2
+                            + " t2 on t1.a_string = t2.a_string order by t2.col1";
+            dataset =  sqlContext.sql(query);
+            rows = dataset.collectAsList();
+            rs = new SparkResultSet(rows, dataset.columns());
+            assertTrue(rs.next());
+            assertEquals("b",rs.getString(1));
+            assertEquals(20,rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("c",rs.getString(1));
+            assertEquals(30,rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("a",rs.getString(1));
+            assertEquals(40,rs.getInt(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testOrderByWithUnionAll() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)){
+            conn.setAutoCommit(false);
+            String tableName1 = generateUniqueName();
+            String ddl = "CREATE TABLE  " + tableName1 +
+                    "  (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer " +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+            createTestTable(getUrl(), ddl);
+            String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "a");
+            stmt.setInt(2, 40);
+            stmt.setString(3, "aa");
+            stmt.setInt(4, 10);
+            stmt.setString(5, "bb");
+            stmt.setInt(6, 20);
+            stmt.execute();
+            stmt.setString(1, "c");
+            stmt.setInt(2, 30);
+            stmt.setString(3, "cc");
+            stmt.setInt(4, 50);
+            stmt.setString(5, "dd");
+            stmt.setInt(6, 60);
+            stmt.execute();
+            stmt.setString(1, "b");
+            stmt.setInt(2, 40);
+            stmt.setString(3, "bb");
+            stmt.setInt(4, 5);
+            stmt.setString(5, "aa");
+            stmt.setInt(6, 80);
+            stmt.execute();
+            conn.commit();
+
+            String tableName2 = generateUniqueName();
+            ddl = "CREATE TABLE " + tableName2 +
+                    "  (a_string varchar not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+            createTestTable(getUrl(), ddl);
+
+            dml = "UPSERT INTO " + tableName2 + " VALUES(?, ?)";
+            stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "aa");
+            stmt.setInt(2, 40);
+            stmt.execute();
+            stmt.setString(1, "bb");
+            stmt.setInt(2, 10);
+            stmt.execute();
+            stmt.setString(1, "cc");
+            stmt.setInt(2, 30);
+            stmt.execute();
+            conn.commit();
+
+
+            List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
+            SQLContext sqlContext = SparkUtil.getSqlContext();
+            Dataset phoenixDataSet =
+                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
+                            JavaConverters.collectionAsScalaIterableConverter(table1Columns)
+                                    .asScala().toSeq(),
+                            Option.apply((String) null), Option.apply(getUrl()), config, false,
+                            null).toDataFrame(sqlContext);
+            phoenixDataSet.registerTempTable(tableName1);
+            List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
+            phoenixDataSet =
+                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
+                            JavaConverters.collectionAsScalaIterableConverter(table2Columns)
+                                    .asScala().toSeq(),
+                            Option.apply((String) null), Option.apply(getUrl()), config, false,
+                            null).toDataFrame(sqlContext);
+            phoenixDataSet.registerTempTable(tableName2);
+
+            String query =
+                    "select a_string, `cf2.d` from " + tableName1 + " union all select * from "
+                            + tableName2 + " order by `cf2.d`";
+            Dataset<Row> dataset =
+                    sqlContext.sql(query);
+            List<Row> rows = dataset.collectAsList();
+            ResultSet rs = new SparkResultSet(rows, dataset.columns());
+            assertTrue(rs.next());
+            assertEquals("bb",rs.getString(1));
+            assertEquals(10,rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("a",rs.getString(1));
+            assertEquals(20,rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("cc",rs.getString(1));
+            assertEquals(30,rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("aa",rs.getString(1));
+            assertEquals(40,rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("c",rs.getString(1));
+            assertEquals(60,rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("b",rs.getString(1));
+            assertEquals(80,rs.getInt(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testOrderByWithExpression() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+
+        try {
+            String tableName = generateUniqueName();
+            String ddl = "CREATE TABLE " + tableName +
+                    "  (a_string varchar not null, col1 integer, col2 integer, col3 timestamp, col4 varchar" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+            createTestTable(getUrl(), ddl);
+
+            Date date = new Date(System.currentTimeMillis());
+            String dml = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "a");
+            stmt.setInt(2, 40);
+            stmt.setInt(3, 20);
+            stmt.setDate(4, new Date(date.getTime()));
+            stmt.setString(5, "xxyy");
+            stmt.execute();
+            stmt.setString(1, "b");
+            stmt.setInt(2, 50);
+            stmt.setInt(3, 30);
+            stmt.setDate(4, new Date(date.getTime()-500));
+            stmt.setString(5, "yyzz");
+            stmt.execute();
+            stmt.setString(1, "c");
+            stmt.setInt(2, 60);
+            stmt.setInt(3, 20);
+            stmt.setDate(4, new Date(date.getTime()-300));
+            stmt.setString(5, "ddee");
+            stmt.execute();
+            conn.commit();
+
+            SQLContext sqlContext = SparkUtil.getSqlContext();
+            Dataset phoenixDataSet =
+                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
+                            JavaConverters
+                                    .collectionAsScalaIterableConverter(
+                                        Lists.newArrayList("col1", "col2", "col4"))
+                                    .asScala().toSeq(),
+                            Option.apply((String) null), Option.apply(getUrl()), config, false,
+                            null).toDataFrame(sqlContext);
+
+            phoenixDataSet.registerTempTable(tableName);
+            Dataset<Row> dataset =
+                    sqlContext.sql("SELECT col1+col2, col4, a_string FROM " + tableName
+                            + " ORDER BY col1+col2, col4");
+            List<Row> rows = dataset.collectAsList();
+            ResultSet rs = new SparkResultSet(rows, dataset.columns());
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("c", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(3));
+            assertFalse(rs.next());
+        } catch (SQLException e) {
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testColumnFamily() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String tableName = generateUniqueName();
+            String ddl = "CREATE TABLE " + tableName +
+                    "  (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer, col2 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+            createTestTable(getUrl(), ddl);
+            String dml = "UPSERT INTO " + tableName + " VALUES(?,?,?,?,?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "a");
+            stmt.setInt(2, 40);
+            stmt.setString(3, "aa");
+            stmt.setInt(4, 10);
+            stmt.setString(5, "bb");
+            stmt.setInt(6, 20);
+            stmt.setInt(7, 1);
+            stmt.execute();
+            stmt.setString(1, "c");
+            stmt.setInt(2, 30);
+            stmt.setString(3, "cc");
+            stmt.setInt(4, 50);
+            stmt.setString(5, "dd");
+            stmt.setInt(6, 60);
+            stmt.setInt(7, 3);
+            stmt.execute();
+            stmt.setString(1, "b");
+            stmt.setInt(2, 40);
+            stmt.setString(3, "bb");
+            stmt.setInt(4, 5);
+            stmt.setString(5, "aa");
+            stmt.setInt(6, 80);
+            stmt.setInt(7, 2);
+            stmt.execute();
+            conn.commit();
+
+
+            List<String> columns =
+                    Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D",
+                        "COL2");
+
+            SQLContext sqlContext = SparkUtil.getSqlContext();
+            Dataset phoenixDataSet =
+                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
+                            JavaConverters.collectionAsScalaIterableConverter(columns).asScala()
+                                    .toSeq(),
+                            Option.apply((String) null), Option.apply(url), config, false, null)
+                                    .toDataFrame(sqlContext);
+
+            phoenixDataSet.registerTempTable(tableName);
+            Dataset<Row> dataset =
+                    sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
+                            + tableName + " ORDER BY `CF1.A`,`CF2.C`");
+            List<Row> rows = dataset.collectAsList();
+            ResultSet rs = new SparkResultSet(rows, dataset.columns());
+
+            assertTrue(rs.next());
+            assertEquals("c",rs.getString(1));
+            assertEquals(30,rs.getInt(2));
+            assertEquals("cc",rs.getString(3));
+            assertEquals(50,rs.getInt(4));
+            assertEquals("dd",rs.getString(5));
+            assertEquals(60,rs.getInt(6));
+            assertEquals(3,rs.getInt(7));
+            assertTrue(rs.next());
+            assertEquals("b",rs.getString(1));
+            assertEquals(40,rs.getInt(2));
+            assertEquals("bb",rs.getString(3));
+            assertEquals(5,rs.getInt(4));
+            assertEquals("aa",rs.getString(5));
+            assertEquals(80,rs.getInt(6));
+            assertEquals(2,rs.getInt(7));
+            assertTrue(rs.next());
+            assertEquals("a",rs.getString(1));
+            assertEquals(40,rs.getInt(2));
+            assertEquals("aa",rs.getString(3));
+            assertEquals(10,rs.getInt(4));
+            assertEquals("bb",rs.getString(5));
+            assertEquals(20,rs.getInt(6));
+            assertEquals(1,rs.getInt(7));
+            assertFalse(rs.next());
+
+            dataset =
+                    sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
+                            + tableName + " ORDER BY COL2");
+            rows = dataset.collectAsList();
+            rs = new SparkResultSet(rows, dataset.columns());
+
+            assertTrue(rs.next());
+            assertEquals("a",rs.getString(1));
+            assertEquals(40,rs.getInt(2));
+            assertEquals("aa",rs.getString(3));
+            assertEquals(10,rs.getInt(4));
+            assertEquals("bb",rs.getString(5));
+            assertEquals(20,rs.getInt(6));
+            assertEquals(1,rs.getInt(7));
+            assertTrue(rs.next());
+            assertEquals("b",rs.getString(1));
+            assertEquals(40,rs.getInt(2));
+            assertEquals("bb",rs.getString(3));
+            assertEquals(5,rs.getInt(4));
+            assertEquals("aa",rs.getString(5));
+            assertEquals(80,rs.getInt(6));
+            assertEquals(2,rs.getInt(7));
+            assertTrue(rs.next());
+            assertEquals("c",rs.getString(1));
+            assertEquals(30,rs.getInt(2));
+            assertEquals("cc",rs.getString(3));
+            assertEquals(50,rs.getInt(4));
+            assertEquals("dd",rs.getString(5));
+            assertEquals(60,rs.getInt(6));
+            assertEquals(3,rs.getInt(7));
+            assertFalse(rs.next());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java
new file mode 100644
index 0000000..d72acbd
--- /dev/null
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.phoenix.end2end.salted.BaseSaltedTableIT;
+import org.apache.phoenix.util.QueryBuilder;
+
+public class SaltedTableIT extends BaseSaltedTableIT {
+
+    @Override
+    protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
+                                                    String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
+        ResultSet rs = null;
+        try {
+            rs = executeQuery(conn, queryBuilder);
+            fail();
+        }
+        catch(Exception e) {
+            assertTrue(e.getMessage().contains(expectedSparkExceptionMsg));
+        }
+        return rs;
+    }
+
+    @Override
+    protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+        return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
new file mode 100644
index 0000000..6285209
--- /dev/null
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
@@ -0,0 +1,87 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.QueryBuilder;
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.SparkPlan;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+public class SparkUtil {
+
+    public static final String APP_NAME = "Java Spark Tests";
+    public static final String NUM_EXECUTORS = "local[2]";
+    public static final String UI_SHOW_CONSOLE_PROGRESS = "spark.ui.showConsoleProgress";
+
+    public static SparkContext getSparkContext() {
+        return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
+                .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sparkContext();
+    }
+
+    public static SQLContext getSqlContext() {
+        return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
+                .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sqlContext();
+    }
+
+    public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config)
+            throws SQLException {
+        SQLContext sqlContext = SparkUtil.getSqlContext();
+
+        boolean forceRowKeyOrder =
+                conn.unwrap(PhoenixConnection.class).getQueryServices().getProps()
+                        .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, false);
+        // if we are forcing row key order we have to add an ORDER BY
+        // here we assume that the required columns are in the primary key column order
+        String prevOrderBy = queryBuilder.getOrderByClause();
+        if (forceRowKeyOrder &&  (queryBuilder.getOrderByClause()==null || queryBuilder.getOrderByClause().isEmpty())) {
+            queryBuilder.setOrderByClause(Joiner.on(", ").join(queryBuilder.getRequiredColumns()));
+        }
+
+        // create PhoenixRDD using the table name and columns that are required by the query
+        // since we don't set the predicate filtering is done after rows are returned from spark
+        Dataset phoenixDataSet =
+                new PhoenixRDD(SparkUtil.getSparkContext(), queryBuilder.getFullTableName(),
+                        JavaConverters.collectionAsScalaIterableConverter(queryBuilder.getRequiredColumns()).asScala()
+                                .toSeq(),
+                        Option.apply((String) null), Option.apply(url), config, false,
+                        null).toDataFrame(sqlContext);
+
+        phoenixDataSet.registerTempTable(queryBuilder.getFullTableName());
+        Dataset<Row> dataset = sqlContext.sql(queryBuilder.build());
+        SparkPlan plan = dataset.queryExecution().executedPlan();
+        List<Row> rows = dataset.collectAsList();
+        queryBuilder.setOrderByClause(prevOrderBy);
+        ResultSet rs = new SparkResultSet(rows, dataset.columns());
+        return rs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 4e11acc..d1e38fa 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -23,6 +23,7 @@ import org.joda.time.DateTime
 import org.apache.spark.{SparkConf, SparkContext}
 import scala.collection.mutable.ListBuffer
 import org.apache.hadoop.conf.Configuration
+
 /**
   * Note: If running directly from an IDE, these are the recommended VM parameters:
   * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
@@ -287,11 +288,11 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
     val plan = res.queryExecution.sparkPlan
     // filters should be pushed into phoenix relation
-    assert(plan.toString.contains("PushedFilters: [IsNotNull(COL1), IsNotNull(ID), " +
-      "EqualTo(COL1,test_row_1), EqualTo(ID,1)]"))
+    assert(plan.toString.contains("PushedFilters: [*IsNotNull(COL1), *IsNotNull(ID), " +
+      "*EqualTo(COL1,test_row_1), *EqualTo(ID,1)]"))
     // spark should run the filters on the rows returned by Phoenix
-    assert(!plan.toString.contains("Filter (((isnotnull(COL1#8) && isnotnull(ID#7L)) " +
-      "&& (COL1#8 = test_row_1)) && (ID#7L = 1))"))
+    assert(!plan.toString.matches(".*Filter (((isnotnull(COL1.*) && isnotnull(ID.*)) "
+      + " && (COL1.* = test_row_1)) && (ID.* = 1)).*"))
   }
 
   test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {