You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2018/11/06 23:14:23 UTC
[2/6] phoenix git commit: PHOENIX-4981 Add tests for ORDER BY,
GROUP BY and salted tables using phoenix-spark
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
index c9168f1..69c9869 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
@@ -37,104 +37,18 @@ import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Test;
-
/**
* Tests for table with transparent salting.
*/
-public class SaltedTableIT extends ParallelStatsDisabledIT {
-
- private static String getUniqueTableName() {
- return SchemaUtil.getTableName(generateUniqueName(), generateUniqueName());
- }
-
- private static String initTableValues(byte[][] splits) throws Exception {
- String tableName = getUniqueTableName();
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
-
- // Rows we inserted:
- // 1ab123abc111
- // 1abc456abc111
- // 1de123abc111
- // 2abc123def222
- // 3abc123ghi333
- // 4abc123jkl444
- try {
- // Upsert with no column specifies.
- ensureTableCreated(getUrl(), tableName, TABLE_WITH_SALTING, splits, null, null);
- String query = "UPSERT INTO " + tableName + " VALUES(?,?,?,?,?)";
- PreparedStatement stmt = conn.prepareStatement(query);
- stmt.setInt(1, 1);
- stmt.setString(2, "ab");
- stmt.setString(3, "123");
- stmt.setString(4, "abc");
- stmt.setInt(5, 111);
- stmt.execute();
- conn.commit();
-
- stmt.setInt(1, 1);
- stmt.setString(2, "abc");
- stmt.setString(3, "456");
- stmt.setString(4, "abc");
- stmt.setInt(5, 111);
- stmt.execute();
- conn.commit();
-
- // Test upsert when statement explicitly specifies the columns to upsert into.
- query = "UPSERT INTO " + tableName +
- " (a_integer, a_string, a_id, b_string, b_integer) " +
- " VALUES(?,?,?,?,?)";
- stmt = conn.prepareStatement(query);
-
- stmt.setInt(1, 1);
- stmt.setString(2, "de");
- stmt.setString(3, "123");
- stmt.setString(4, "abc");
- stmt.setInt(5, 111);
- stmt.execute();
- conn.commit();
-
- stmt.setInt(1, 2);
- stmt.setString(2, "abc");
- stmt.setString(3, "123");
- stmt.setString(4, "def");
- stmt.setInt(5, 222);
- stmt.execute();
- conn.commit();
-
- // Test upsert when order of column is shuffled.
- query = "UPSERT INTO " + tableName +
- " (a_string, a_integer, a_id, b_string, b_integer) " +
- " VALUES(?,?,?,?,?)";
- stmt = conn.prepareStatement(query);
- stmt.setString(1, "abc");
- stmt.setInt(2, 3);
- stmt.setString(3, "123");
- stmt.setString(4, "ghi");
- stmt.setInt(5, 333);
- stmt.execute();
- conn.commit();
-
- stmt.setString(1, "abc");
- stmt.setInt(2, 4);
- stmt.setString(3, "123");
- stmt.setString(4, "jkl");
- stmt.setInt(5, 444);
- stmt.execute();
- conn.commit();
- } finally {
- conn.close();
- }
- return tableName;
- }
+public class SaltedTableIT extends BaseSaltedTableIT {
@Test
public void testTableWithInvalidBucketNumber() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
try {
- String query = "create table " + getUniqueTableName() + " (a_integer integer not null CONSTRAINT pk PRIMARY KEY (a_integer)) SALT_BUCKETS = 257";
+ String query = "create table " + generateUniqueName() + " (a_integer integer not null CONSTRAINT pk PRIMARY KEY (a_integer)) SALT_BUCKETS = 257";
PreparedStatement stmt = conn.prepareStatement(query);
stmt.execute();
fail("Should have caught exception");
@@ -148,370 +62,12 @@ public class SaltedTableIT extends ParallelStatsDisabledIT {
@Test
public void testTableWithSplit() throws Exception {
try {
- createTestTable(getUrl(), "create table " + getUniqueTableName() + " (a_integer integer not null primary key) SALT_BUCKETS = 4",
+ createTestTable(getUrl(), "create table " + generateUniqueName() + " (a_integer integer not null primary key) SALT_BUCKETS = 4",
new byte[][] {{1}, {2,3}, {2,5}, {3}}, null);
fail("Should have caught exception");
} catch (SQLException e) {
assertTrue(e.getMessage(), e.getMessage().contains("ERROR 1022 (42Y81): Should not specify split points on salted table with default row key order."));
}
}
-
- @Test
- public void testSelectValueNoWhereClause() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- String tableName = initTableValues(null);
-
- String query = "SELECT * FROM " + tableName;
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
-
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("ab", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("456", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("de", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("def", rs.getString(4));
- assertEquals(222, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(3, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("ghi", rs.getString(4));
- assertEquals(333, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("jkl", rs.getString(4));
- assertEquals(444, rs.getInt(5));
-
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testSelectValueWithFullyQualifiedWhereClause() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- String tableName = initTableValues(null);
- String query;
- PreparedStatement stmt;
- ResultSet rs;
-
- // Variable length slot with bounded ranges.
- query = "SELECT * FROM " + tableName +
- " WHERE a_integer = 1 AND a_string >= 'ab' AND a_string < 'de' AND a_id = '123'";
- stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("ab", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
- assertFalse(rs.next());
-
- // all single slots with one value.
- query = "SELECT * FROM " + tableName +
- " WHERE a_integer = 1 AND a_string = 'ab' AND a_id = '123'";
- stmt = conn.prepareStatement(query);
-
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("ab", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
- assertFalse(rs.next());
-
- // all single slots with multiple values.
- query = "SELECT * FROM " + tableName +
- " WHERE a_integer in (2, 4) AND a_string = 'abc' AND a_id = '123'";
- stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
-
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("def", rs.getString(4));
- assertEquals(222, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("jkl", rs.getString(4));
- assertEquals(444, rs.getInt(5));
- assertFalse(rs.next());
-
- query = "SELECT a_integer, a_string FROM " + tableName +
- " WHERE a_integer in (1,2,3,4) AND a_string in ('a', 'abc', 'de') AND a_id = '123'";
- stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
-
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("de", rs.getString(2));
-
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
-
- assertTrue(rs.next());
- assertEquals(3, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
-
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertFalse(rs.next());
-
- // fixed length slot with bounded ranges.
- query = "SELECT a_string, a_id FROM " + tableName +
- " WHERE a_integer > 1 AND a_integer < 4 AND a_string = 'abc' AND a_id = '123'";
- stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals("abc", rs.getString(1));
- assertEquals("123", rs.getString(2));
-
- assertTrue(rs.next());
- assertEquals("abc", rs.getString(1));
- assertEquals("123", rs.getString(2));
- assertFalse(rs.next());
-
- // fixed length slot with unbound ranges.
- query = "SELECT b_string, b_integer FROM " + tableName +
- " WHERE a_integer > 1 AND a_string = 'abc' AND a_id = '123'";
- stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals("def", rs.getString(1));
- assertEquals(222, rs.getInt(2));
-
- assertTrue(rs.next());
- assertEquals("ghi", rs.getString(1));
- assertEquals(333, rs.getInt(2));
-
- assertTrue(rs.next());
- assertEquals("jkl", rs.getString(1));
- assertEquals(444, rs.getInt(2));
- assertFalse(rs.next());
-
- // Variable length slot with unbounded ranges.
- query = "SELECT * FROM " + tableName +
- " WHERE a_integer = 1 AND a_string > 'ab' AND a_id = '123'";
- stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("de", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
- assertFalse(rs.next());
-
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testSelectValueWithNotFullyQualifiedWhereClause() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- String tableName = initTableValues(null);
-
- // Where without fully qualified key, point query.
- String query = "SELECT * FROM " + tableName + " WHERE a_integer = ? AND a_string = ?";
- PreparedStatement stmt = conn.prepareStatement(query);
-
- stmt.setInt(1, 1);
- stmt.setString(2, "abc");
- ResultSet rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("456", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
- assertFalse(rs.next());
-
- // Where without fully qualified key, range query.
- query = "SELECT * FROM " + tableName + " WHERE a_integer >= 2";
- stmt = conn.prepareStatement(query);
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("def", rs.getString(4));
- assertEquals(222, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(3, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("ghi", rs.getString(4));
- assertEquals(333, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("jkl", rs.getString(4));
- assertEquals(444, rs.getInt(5));
- assertFalse(rs.next());
-
- // With point query.
- query = "SELECT a_string FROM " + tableName + " WHERE a_string = ?";
- stmt = conn.prepareStatement(query);
- stmt.setString(1, "de");
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals("de", rs.getString(1));
- assertFalse(rs.next());
-
- query = "SELECT a_id FROM " + tableName + " WHERE a_id = ?";
- stmt = conn.prepareStatement(query);
- stmt.setString(1, "456");
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals("456", rs.getString(1));
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testSelectWithGroupBy() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- String tableName = initTableValues(null);
-
- String query = "SELECT a_integer FROM " + tableName + " GROUP BY a_integer";
- PreparedStatement stmt = conn.prepareStatement(query);
- ResultSet rs = stmt.executeQuery();
- int count = 0;
- while (rs.next()) {
- count++;
- }
- assertEquals("Group by does not return the right count.", count, 4);
- } finally {
- conn.close();
- }
- }
- @Test
- public void testLimitScan() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- String tableName = initTableValues(null);
-
- String query = "SELECT a_integer FROM " + tableName + " WHERE a_string='abc' LIMIT 1";
- PreparedStatement stmt = conn.prepareStatement(query);
- ResultSet rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testSelectWithOrderByRowKey() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- String tableName = initTableValues(null);
-
- String query = "SELECT * FROM " + tableName + " ORDER BY a_integer, a_string, a_id";
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet explainPlan = statement.executeQuery("EXPLAIN " + query);
- // Confirm that ORDER BY in row key order will be optimized out for salted table
- assertEquals("CLIENT PARALLEL 4-WAY FULL SCAN OVER " + tableName + "\n" +
- "CLIENT MERGE SORT", QueryUtil.getExplainPlan(explainPlan));
- ResultSet rs = statement.executeQuery();
-
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("ab", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("456", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals("de", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("abc", rs.getString(4));
- assertEquals(111, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("def", rs.getString(4));
- assertEquals(222, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(3, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("ghi", rs.getString(4));
- assertEquals(333, rs.getInt(5));
-
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1));
- assertEquals("abc", rs.getString(2));
- assertEquals("123", rs.getString(3));
- assertEquals("jkl", rs.getString(4));
- assertEquals(444, rs.getInt(5));
-
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java
new file mode 100644
index 0000000..afce0dd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.phoenix.parse.HintNode;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.phoenix.util.SchemaUtil.getEscapedFullColumnName;
+
+public class QueryBuilder {
+
+ private String fullTableName;
+ // regular columns that are in the select clause
+ private List<String> selectColumns = Collections.emptyList();
+
+ // columns that are required for expressions in the select clause
+ private List<String> selectExpressionColumns = Collections.emptyList();
+ // expression string in the select clause (for eg COL1 || COL2)
+ private String selectExpression;
+ private String whereClause;
+ private String orderByClause;
+ private String groupByClause;
+ private String havingClause;
+ private HintNode.Hint hint;
+ private boolean escapeCols;
+ private boolean distinct;
+ private int limit;
+
+ public String getFullTableName() {
+ return fullTableName;
+ }
+
+ /**
+ * @return column names required to evaluate this select statement
+ */
+ public List<String> getRequiredColumns() {
+ List<String> allColumns = Lists.newArrayList(selectColumns);
+ if (!CollectionUtils.isEmpty(selectExpressionColumns)) {
+ allColumns.addAll(selectExpressionColumns);
+ }
+ return allColumns;
+ }
+
+ public String getWhereClause() {
+ return whereClause;
+ }
+
+ public HintNode.Hint getHint() {
+ return hint;
+ }
+
+ public String getOrderByClause() {
+ return orderByClause;
+ }
+
+ public String getGroupByClause() {
+ return groupByClause;
+ }
+
+ public QueryBuilder setOrderByClause(String orderByClause) {
+ this.orderByClause = orderByClause;
+ return this;
+ }
+
+ public QueryBuilder setFullTableName(String fullTableName) {
+ this.fullTableName = fullTableName;
+ return this;
+ }
+
+ public QueryBuilder setSelectColumns(List<String> columns) {
+ this.selectColumns = columns;
+ return this;
+ }
+
+ public QueryBuilder setWhereClause(String whereClause) {
+ this.whereClause = whereClause;
+ return this;
+ }
+
+ public QueryBuilder setHint(HintNode.Hint hint) {
+ this.hint = hint;
+ return this;
+ }
+
+ public QueryBuilder setEscapeCols(boolean escapeCols) {
+ this.escapeCols = escapeCols;
+ return this;
+ }
+
+ public QueryBuilder setGroupByClause(String groupByClause) {
+ this.groupByClause = groupByClause;
+ return this;
+ }
+
+ public QueryBuilder setHavingClause(String havingClause) {
+ this.havingClause = havingClause;
+ return this;
+ }
+
+ public List<String> getSelectExpressionColumns() {
+ return selectExpressionColumns;
+ }
+
+ public QueryBuilder setSelectExpressionColumns(List<String> selectExpressionColumns) {
+ this.selectExpressionColumns = selectExpressionColumns;
+ return this;
+ }
+
+ public String getSelectExpression() {
+ return selectExpression;
+ }
+
+ public QueryBuilder setSelectExpression(String selectExpression) {
+ this.selectExpression = selectExpression;
+ return this;
+ }
+
+ public QueryBuilder setDistinct(boolean distinct) {
+ this.distinct = distinct;
+ return this;
+ }
+
+ public QueryBuilder setLimit(int limit) {
+ this.limit = limit;
+ return this;
+ }
+
+ public String build() {
+ Preconditions.checkNotNull(fullTableName, "Table name cannot be null");
+ if (CollectionUtils.isEmpty(selectColumns) && StringUtils.isBlank(selectExpression)) {
+ throw new IllegalArgumentException("At least one column or select expression must be provided");
+ }
+ StringBuilder query = new StringBuilder();
+ query.append("SELECT ");
+
+ if (distinct) {
+ query.append(" DISTINCT ");
+ }
+
+ if (hint != null) {
+ final HintNode node = new HintNode(hint.name());
+ String hintStr = node.toString();
+ query.append(hintStr);
+ }
+
+ StringBuilder selectClauseBuilder = new StringBuilder();
+ if (StringUtils.isNotBlank(selectExpression)) {
+ if (selectClauseBuilder.length()!=0) {
+ selectClauseBuilder.append(" , ");
+ }
+ selectClauseBuilder.append(selectExpression);
+ }
+
+ boolean first = true;
+ for (String col : selectColumns) {
+ if (StringUtils.isNotBlank(col)) {
+ if ((first && selectClauseBuilder.length()!=0) || !first) {
+ selectClauseBuilder.append(" , ");
+ }
+ String fullColumnName = col;
+ if (escapeCols) {
+ fullColumnName = getEscapedFullColumnName(col);
+ }
+ selectClauseBuilder.append(fullColumnName);
+ first = false;
+ }
+ }
+
+ query.append(selectClauseBuilder);
+ query.append(" FROM ");
+ query.append(fullTableName);
+ if (StringUtils.isNotBlank(whereClause)) {
+ query.append(" WHERE (").append(whereClause).append(")");
+ }
+ if (StringUtils.isNotBlank(groupByClause)) {
+ query.append(" GROUP BY ").append(groupByClause);
+ }
+ if (StringUtils.isNotBlank(havingClause)) {
+ query.append(" HAVING ").append(havingClause);
+ }
+ if (StringUtils.isNotBlank(orderByClause)) {
+ query.append(" ORDER BY ").append(orderByClause);
+ }
+ if (limit > 0) {
+ query.append(" LIMIT ").append(limit);
+ }
+ return query.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 94cbfea..4501158 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -217,45 +217,15 @@ public final class QueryUtil {
*
* @param fullTableName name of the table for which the select statement needs to be created.
* @param columns list of columns to be projected in the select statement.
- * @param conditions The condition clause to be added to the WHERE condition
+ * @param whereClause The condition clause to be added to the WHERE condition
* @param hint hint to use
* @param escapeCols whether to escape the projected columns
* @return Select Query
*/
public static String constructSelectStatement(String fullTableName, List<String> columns,
- final String conditions, Hint hint, boolean escapeCols) {
- Preconditions.checkNotNull(fullTableName, "Table name cannot be null");
- if (columns == null || columns.isEmpty()) {
- throw new IllegalArgumentException("At least one column must be provided");
- }
- StringBuilder query = new StringBuilder();
- query.append("SELECT ");
-
- String hintStr = "";
- if (hint != null) {
- final HintNode node = new HintNode(hint.name());
- hintStr = node.toString();
- }
- query.append(hintStr);
-
- for (String col : columns) {
- if (col != null) {
- String fullColumnName = col;
- if (escapeCols) {
- fullColumnName = getEscapedFullColumnName(col);
- }
- query.append(fullColumnName);
- query.append(",");
- }
- }
- // Remove the trailing comma
- query.setLength(query.length() - 1);
- query.append(" FROM ");
- query.append(fullTableName);
- if (conditions != null && conditions.length() > 0) {
- query.append(" WHERE (").append(conditions).append(")");
- }
- return query.toString();
+ final String whereClause, Hint hint, boolean escapeCols) {
+ return new QueryBuilder().setFullTableName(fullTableName).setSelectColumns(columns)
+ .setWhereClause(whereClause).setHint(hint).setEscapeCols(escapeCols).build();
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
index c6bb739..a904bca 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
@@ -47,7 +47,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest {
String sqlStr =
IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, columnNames,
SCRUTINY_TIME_MILLIS);
- assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193))",
+ assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193))",
sqlStr);
}
@@ -58,7 +58,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest {
String query =
IndexScrutinyTableOutput.getSqlQueryMissingTargetRows(conn, columnNames,
SCRUTINY_TIME_MILLIS);
- assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,false))",
+ assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,false))",
query);
}
@@ -69,7 +69,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest {
String query =
IndexScrutinyTableOutput.getSqlQueryBadCoveredColVal(conn, columnNames,
SCRUTINY_TIME_MILLIS);
- assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,true))",
+ assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,true))",
query);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index f864dd5..0c4c004 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -145,7 +145,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
- final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + tableName ;
+ final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + tableName ;
assertEquals(expectedSelectStatement, selectStatement);
} finally {
conn.close();
@@ -167,7 +167,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
PhoenixConfigurationUtil.setInputTableName(configuration, fullTableName);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
- final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + fullTableName;
+ final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + fullTableName;
assertEquals(expectedSelectStatement, selectStatement);
} finally {
conn.close();
@@ -209,7 +209,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
- final String expectedSelectStatement = "SELECT \"ID\",\"0\".\"VCARRAY\" FROM " + tableName ;
+ final String expectedSelectStatement = "SELECT \"ID\" , \"0\".\"VCARRAY\" FROM " + tableName ;
assertEquals(expectedSelectStatement, selectStatement);
} finally {
conn.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 2d094f6..8ee8f97 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -66,14 +66,14 @@ public class QueryUtilTest {
@Test
public void testConstructSelectStatement() {
assertEquals(
- "SELECT \"ID\",\"NAME\" FROM MYTAB",
+ "SELECT \"ID\" , \"NAME\" FROM MYTAB",
QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
}
@Test
public void testConstructSelectStatementWithSchema() {
assertEquals(
- "SELECT \"ID\",\"NAME\" FROM A.MYTAB",
+ "SELECT \"ID\" , \"NAME\" FROM A.MYTAB",
QueryUtil.constructSelectStatement("A.MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
}
@@ -83,7 +83,7 @@ public class QueryUtilTest {
final String schemaName = SchemaUtil.getEscapedArgument("a");
final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
assertEquals(
- "SELECT \"ID\",\"NAME\" FROM \"a\".MYTAB",
+ "SELECT \"ID\" , \"NAME\" FROM \"a\".MYTAB",
QueryUtil.constructSelectStatement(fullTableName, ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
}
@@ -93,14 +93,14 @@ public class QueryUtilTest {
final String schemaName = SchemaUtil.getEscapedArgument("a");
final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
assertEquals(
- "SELECT \"ID\",\"NAME\" FROM \"a\".\"mytab\"",
+ "SELECT \"ID\" , \"NAME\" FROM \"a\".\"mytab\"",
QueryUtil.constructSelectStatement(fullTableName, ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
}
@Test
public void testConstructSelectWithHint() {
assertEquals(
- "SELECT /*+ NO_INDEX */ \"col1\",\"col2\" FROM MYTAB WHERE (\"col2\"=? and \"col3\" is null)",
+ "SELECT /*+ NO_INDEX */ \"col1\" , \"col2\" FROM MYTAB WHERE (\"col2\"=? and \"col3\" is null)",
QueryUtil.constructSelectStatement("MYTAB", Lists.newArrayList("col1", "col2"),
"\"col2\"=? and \"col3\" is null", Hint.NO_INDEX, true));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 7e04bc1..cbf3808 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -488,6 +488,14 @@
<testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
<plugins>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java
new file mode 100644
index 0000000..e4b96a3
--- /dev/null
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.phoenix.end2end.BaseAggregateIT;
+import org.apache.phoenix.util.QueryBuilder;
+
+public class AggregateIT extends BaseAggregateIT {
+
+ @Override
+ protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
+ String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
+ ResultSet rs = null;
+ try {
+ rs = executeQuery(conn, queryBuilder);
+ fail();
+ }
+ catch(Exception e) {
+ assertTrue(e.getMessage().contains(expectedSparkExceptionMsg));
+ }
+ return rs;
+ }
+
+ @Override
+ protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+ return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config);
+ }
+
+ @Override
+ protected void testCountNullInNonEmptyKeyValueCF(int columnEncodedBytes) throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ //Type is INT
+ String intTableName=generateUniqueName();
+ String sql="create table " + intTableName + " (mykey integer not null primary key, A.COLA integer, B.COLB integer) "
+ + "IMMUTABLE_ROWS=true, IMMUTABLE_STORAGE_SCHEME = ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = " + columnEncodedBytes + ", DISABLE_WAL=true";
+
+ conn.createStatement().execute(sql);
+ conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (1,1)");
+ conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (2,1)");
+ conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (3,1,2)");
+ conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (4,1)");
+ conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (5,1)");
+ conn.commit();
+
+ sql="select count(*) from "+intTableName;
+ QueryBuilder queryBuilder = new QueryBuilder()
+ .setSelectExpression("COUNT(*)")
+ .setFullTableName(intTableName);
+ ResultSet rs = executeQuery(conn, queryBuilder);
+ assertTrue(rs.next());
+ assertEquals(5, rs.getLong(1));
+
+ sql="select count(*) from "+intTableName + " where b.colb is not null";
+ queryBuilder.setWhereClause("`B.COLB` IS NOT NULL");
+ rs = executeQuery(conn, queryBuilder);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getLong(1));
+
+ sql="select count(*) from "+intTableName + " where b.colb is null";
+ queryBuilder.setWhereClause("`B.COLB` IS NULL");
+ rs = executeQuery(conn, queryBuilder);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(1));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
new file mode 100644
index 0000000..bdffaf5
--- /dev/null
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -0,0 +1,460 @@
+package org.apache.phoenix.spark;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseOrderByIT;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryBuilder;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import scala.Option;
+import scala.collection.JavaConverters;
+
+public class OrderByIT extends BaseOrderByIT {
+
+ @Override
+ protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
+ String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
+ ResultSet rs = null;
+ try {
+ rs = executeQuery(conn, queryBuilder);
+ fail();
+ }
+ catch(Exception e) {
+ assertTrue(e.getMessage().contains(expectedSparkExceptionMsg));
+ }
+ return rs;
+ }
+
+ @Override
+ protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+ return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config);
+ }
+
+ @Test
+ public void testOrderByWithJoin() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String tableName1 = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName1 +
+ " (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer " +
+ " CONSTRAINT pk PRIMARY KEY (a_string))\n";
+ createTestTable(getUrl(), ddl);
+ String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setString(1, "a");
+ stmt.setInt(2, 40);
+ stmt.setString(3, "aa");
+ stmt.setInt(4, 10);
+ stmt.setString(5, "bb");
+ stmt.setInt(6, 20);
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setInt(2, 30);
+ stmt.setString(3, "cc");
+ stmt.setInt(4, 50);
+ stmt.setString(5, "dd");
+ stmt.setInt(6, 60);
+ stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setInt(2, 40);
+ stmt.setString(3, "bb");
+ stmt.setInt(4, 5);
+ stmt.setString(5, "aa");
+ stmt.setInt(6, 80);
+ stmt.execute();
+ conn.commit();
+
+ String tableName2 = generateUniqueName();
+ ddl = "CREATE TABLE " + tableName2 +
+ " (a_string varchar not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string))\n";
+ createTestTable(getUrl(), ddl);
+
+ dml = "UPSERT INTO " + tableName2 + " VALUES(?, ?)";
+ stmt = conn.prepareStatement(dml);
+ stmt.setString(1, "a");
+ stmt.setInt(2, 40);
+ stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setInt(2, 20);
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setInt(2, 30);
+ stmt.execute();
+ conn.commit();
+
+ // create two PhoenixRDDs using the table names and columns that are required for the JOIN query
+ List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
+ SQLContext sqlContext = SparkUtil.getSqlContext();
+ Dataset phoenixDataSet =
+ new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
+ JavaConverters.collectionAsScalaIterableConverter(table1Columns)
+ .asScala().toSeq(),
+ Option.apply((String) null), Option.apply(getUrl()), config, false,
+ null).toDataFrame(sqlContext);
+ phoenixDataSet.registerTempTable(tableName1);
+ List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
+ phoenixDataSet =
+ new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
+ JavaConverters.collectionAsScalaIterableConverter(table2Columns)
+ .asScala().toSeq(),
+ Option.apply((String) null), Option.apply(getUrl()), config, false,
+ null).toDataFrame(sqlContext);
+ phoenixDataSet.registerTempTable(tableName2);
+
+ String query =
+ "SELECT T1.* FROM " + tableName1 + " T1 JOIN " + tableName2
+ + " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T1.`CF1.B`";
+ Dataset<Row> dataset =
+ sqlContext.sql(query);
+ List<Row> rows = dataset.collectAsList();
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(40,rs.getInt(2));
+ assertEquals("aa",rs.getString(3));
+ assertEquals(10,rs.getInt(4));
+ assertEquals("bb",rs.getString(5));
+ assertEquals(20,rs.getInt(6));
+ assertTrue(rs.next());
+ assertEquals("b",rs.getString(1));
+ assertEquals(40,rs.getInt(2));
+ assertEquals("bb",rs.getString(3));
+ assertEquals(5,rs.getInt(4));
+ assertEquals("aa",rs.getString(5));
+ assertEquals(80,rs.getInt(6));
+ assertTrue(rs.next());
+ assertEquals("c",rs.getString(1));
+ assertEquals(30,rs.getInt(2));
+ assertEquals("cc",rs.getString(3));
+ assertEquals(50,rs.getInt(4));
+ assertEquals("dd",rs.getString(5));
+ assertEquals(60,rs.getInt(6));
+ assertFalse(rs.next());
+
+ query =
+ "select t1.a_string, t2.col1 from " + tableName1 + " t1 join " + tableName2
+ + " t2 on t1.a_string = t2.a_string order by t2.col1";
+ dataset = sqlContext.sql(query);
+ rows = dataset.collectAsList();
+ rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals("b",rs.getString(1));
+ assertEquals(20,rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("c",rs.getString(1));
+ assertEquals(30,rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(40,rs.getInt(2));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testOrderByWithUnionAll() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)){
+ conn.setAutoCommit(false);
+ String tableName1 = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName1 +
+ " (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer " +
+ " CONSTRAINT pk PRIMARY KEY (a_string))\n";
+ createTestTable(getUrl(), ddl);
+ String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setString(1, "a");
+ stmt.setInt(2, 40);
+ stmt.setString(3, "aa");
+ stmt.setInt(4, 10);
+ stmt.setString(5, "bb");
+ stmt.setInt(6, 20);
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setInt(2, 30);
+ stmt.setString(3, "cc");
+ stmt.setInt(4, 50);
+ stmt.setString(5, "dd");
+ stmt.setInt(6, 60);
+ stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setInt(2, 40);
+ stmt.setString(3, "bb");
+ stmt.setInt(4, 5);
+ stmt.setString(5, "aa");
+ stmt.setInt(6, 80);
+ stmt.execute();
+ conn.commit();
+
+ String tableName2 = generateUniqueName();
+ ddl = "CREATE TABLE " + tableName2 +
+ " (a_string varchar not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string))\n";
+ createTestTable(getUrl(), ddl);
+
+ dml = "UPSERT INTO " + tableName2 + " VALUES(?, ?)";
+ stmt = conn.prepareStatement(dml);
+ stmt.setString(1, "aa");
+ stmt.setInt(2, 40);
+ stmt.execute();
+ stmt.setString(1, "bb");
+ stmt.setInt(2, 10);
+ stmt.execute();
+ stmt.setString(1, "cc");
+ stmt.setInt(2, 30);
+ stmt.execute();
+ conn.commit();
+
+
+ List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
+ SQLContext sqlContext = SparkUtil.getSqlContext();
+ Dataset phoenixDataSet =
+ new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
+ JavaConverters.collectionAsScalaIterableConverter(table1Columns)
+ .asScala().toSeq(),
+ Option.apply((String) null), Option.apply(getUrl()), config, false,
+ null).toDataFrame(sqlContext);
+ phoenixDataSet.registerTempTable(tableName1);
+ List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
+ phoenixDataSet =
+ new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
+ JavaConverters.collectionAsScalaIterableConverter(table2Columns)
+ .asScala().toSeq(),
+ Option.apply((String) null), Option.apply(getUrl()), config, false,
+ null).toDataFrame(sqlContext);
+ phoenixDataSet.registerTempTable(tableName2);
+
+ String query =
+ "select a_string, `cf2.d` from " + tableName1 + " union all select * from "
+ + tableName2 + " order by `cf2.d`";
+ Dataset<Row> dataset =
+ sqlContext.sql(query);
+ List<Row> rows = dataset.collectAsList();
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals("bb",rs.getString(1));
+ assertEquals(10,rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(20,rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("cc",rs.getString(1));
+ assertEquals(30,rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("aa",rs.getString(1));
+ assertEquals(40,rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("c",rs.getString(1));
+ assertEquals(60,rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("b",rs.getString(1));
+ assertEquals(80,rs.getInt(2));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testOrderByWithExpression() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+
+ try {
+ String tableName = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName +
+ " (a_string varchar not null, col1 integer, col2 integer, col3 timestamp, col4 varchar" +
+ " CONSTRAINT pk PRIMARY KEY (a_string))\n";
+ createTestTable(getUrl(), ddl);
+
+ Date date = new Date(System.currentTimeMillis());
+ String dml = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setString(1, "a");
+ stmt.setInt(2, 40);
+ stmt.setInt(3, 20);
+ stmt.setDate(4, new Date(date.getTime()));
+ stmt.setString(5, "xxyy");
+ stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setInt(2, 50);
+ stmt.setInt(3, 30);
+ stmt.setDate(4, new Date(date.getTime()-500));
+ stmt.setString(5, "yyzz");
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setInt(2, 60);
+ stmt.setInt(3, 20);
+ stmt.setDate(4, new Date(date.getTime()-300));
+ stmt.setString(5, "ddee");
+ stmt.execute();
+ conn.commit();
+
+ SQLContext sqlContext = SparkUtil.getSqlContext();
+ Dataset phoenixDataSet =
+ new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
+ JavaConverters
+ .collectionAsScalaIterableConverter(
+ Lists.newArrayList("col1", "col2", "col4"))
+ .asScala().toSeq(),
+ Option.apply((String) null), Option.apply(getUrl()), config, false,
+ null).toDataFrame(sqlContext);
+
+ phoenixDataSet.registerTempTable(tableName);
+ Dataset<Row> dataset =
+ sqlContext.sql("SELECT col1+col2, col4, a_string FROM " + tableName
+ + " ORDER BY col1+col2, col4");
+ List<Row> rows = dataset.collectAsList();
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(3));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(3));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(3));
+ assertFalse(rs.next());
+ } catch (SQLException e) {
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testColumnFamily() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String tableName = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName +
+ " (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer, col2 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string))\n";
+ createTestTable(getUrl(), ddl);
+ String dml = "UPSERT INTO " + tableName + " VALUES(?,?,?,?,?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setString(1, "a");
+ stmt.setInt(2, 40);
+ stmt.setString(3, "aa");
+ stmt.setInt(4, 10);
+ stmt.setString(5, "bb");
+ stmt.setInt(6, 20);
+ stmt.setInt(7, 1);
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setInt(2, 30);
+ stmt.setString(3, "cc");
+ stmt.setInt(4, 50);
+ stmt.setString(5, "dd");
+ stmt.setInt(6, 60);
+ stmt.setInt(7, 3);
+ stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setInt(2, 40);
+ stmt.setString(3, "bb");
+ stmt.setInt(4, 5);
+ stmt.setString(5, "aa");
+ stmt.setInt(6, 80);
+ stmt.setInt(7, 2);
+ stmt.execute();
+ conn.commit();
+
+
+ List<String> columns =
+ Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D",
+ "COL2");
+
+ SQLContext sqlContext = SparkUtil.getSqlContext();
+ Dataset phoenixDataSet =
+ new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
+ JavaConverters.collectionAsScalaIterableConverter(columns).asScala()
+ .toSeq(),
+ Option.apply((String) null), Option.apply(url), config, false, null)
+ .toDataFrame(sqlContext);
+
+ phoenixDataSet.registerTempTable(tableName);
+ Dataset<Row> dataset =
+ sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
+ + tableName + " ORDER BY `CF1.A`,`CF2.C`");
+ List<Row> rows = dataset.collectAsList();
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+
+ assertTrue(rs.next());
+ assertEquals("c",rs.getString(1));
+ assertEquals(30,rs.getInt(2));
+ assertEquals("cc",rs.getString(3));
+ assertEquals(50,rs.getInt(4));
+ assertEquals("dd",rs.getString(5));
+ assertEquals(60,rs.getInt(6));
+ assertEquals(3,rs.getInt(7));
+ assertTrue(rs.next());
+ assertEquals("b",rs.getString(1));
+ assertEquals(40,rs.getInt(2));
+ assertEquals("bb",rs.getString(3));
+ assertEquals(5,rs.getInt(4));
+ assertEquals("aa",rs.getString(5));
+ assertEquals(80,rs.getInt(6));
+ assertEquals(2,rs.getInt(7));
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(40,rs.getInt(2));
+ assertEquals("aa",rs.getString(3));
+ assertEquals(10,rs.getInt(4));
+ assertEquals("bb",rs.getString(5));
+ assertEquals(20,rs.getInt(6));
+ assertEquals(1,rs.getInt(7));
+ assertFalse(rs.next());
+
+ dataset =
+ sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
+ + tableName + " ORDER BY COL2");
+ rows = dataset.collectAsList();
+ rs = new SparkResultSet(rows, dataset.columns());
+
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(40,rs.getInt(2));
+ assertEquals("aa",rs.getString(3));
+ assertEquals(10,rs.getInt(4));
+ assertEquals("bb",rs.getString(5));
+ assertEquals(20,rs.getInt(6));
+ assertEquals(1,rs.getInt(7));
+ assertTrue(rs.next());
+ assertEquals("b",rs.getString(1));
+ assertEquals(40,rs.getInt(2));
+ assertEquals("bb",rs.getString(3));
+ assertEquals(5,rs.getInt(4));
+ assertEquals("aa",rs.getString(5));
+ assertEquals(80,rs.getInt(6));
+ assertEquals(2,rs.getInt(7));
+ assertTrue(rs.next());
+ assertEquals("c",rs.getString(1));
+ assertEquals(30,rs.getInt(2));
+ assertEquals("cc",rs.getString(3));
+ assertEquals(50,rs.getInt(4));
+ assertEquals("dd",rs.getString(5));
+ assertEquals(60,rs.getInt(6));
+ assertEquals(3,rs.getInt(7));
+ assertFalse(rs.next());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java
new file mode 100644
index 0000000..d72acbd
--- /dev/null
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.phoenix.end2end.salted.BaseSaltedTableIT;
+import org.apache.phoenix.util.QueryBuilder;
+
+public class SaltedTableIT extends BaseSaltedTableIT {
+
+ @Override
+ protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
+ String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
+ ResultSet rs = null;
+ try {
+ rs = executeQuery(conn, queryBuilder);
+ fail();
+ }
+ catch(Exception e) {
+ assertTrue(e.getMessage().contains(expectedSparkExceptionMsg));
+ }
+ return rs;
+ }
+
+ @Override
+ protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+ return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
new file mode 100644
index 0000000..6285209
--- /dev/null
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
@@ -0,0 +1,87 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.QueryBuilder;
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.SparkPlan;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+public class SparkUtil {
+
+ public static final String APP_NAME = "Java Spark Tests";
+ public static final String NUM_EXECUTORS = "local[2]";
+ public static final String UI_SHOW_CONSOLE_PROGRESS = "spark.ui.showConsoleProgress";
+
+ public static SparkContext getSparkContext() {
+ return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
+ .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sparkContext();
+ }
+
+ public static SQLContext getSqlContext() {
+ return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
+ .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sqlContext();
+ }
+
+ public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config)
+ throws SQLException {
+ SQLContext sqlContext = SparkUtil.getSqlContext();
+
+ boolean forceRowKeyOrder =
+ conn.unwrap(PhoenixConnection.class).getQueryServices().getProps()
+ .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, false);
+ // if we are forcing row key order we have to add an ORDER BY
+ // here we assume that the required columns are in the primary key column order
+ String prevOrderBy = queryBuilder.getOrderByClause();
+ if (forceRowKeyOrder && (queryBuilder.getOrderByClause()==null || queryBuilder.getOrderByClause().isEmpty())) {
+ queryBuilder.setOrderByClause(Joiner.on(", ").join(queryBuilder.getRequiredColumns()));
+ }
+
+ // create PhoenixRDD using the table name and columns that are required by the query
+ // since we don't set the predicate filtering is done after rows are returned from spark
+ Dataset phoenixDataSet =
+ new PhoenixRDD(SparkUtil.getSparkContext(), queryBuilder.getFullTableName(),
+ JavaConverters.collectionAsScalaIterableConverter(queryBuilder.getRequiredColumns()).asScala()
+ .toSeq(),
+ Option.apply((String) null), Option.apply(url), config, false,
+ null).toDataFrame(sqlContext);
+
+ phoenixDataSet.registerTempTable(queryBuilder.getFullTableName());
+ Dataset<Row> dataset = sqlContext.sql(queryBuilder.build());
+ SparkPlan plan = dataset.queryExecution().executedPlan();
+ List<Row> rows = dataset.collectAsList();
+ queryBuilder.setOrderByClause(prevOrderBy);
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+ return rs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c509d58f/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 4e11acc..d1e38fa 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -23,6 +23,7 @@ import org.joda.time.DateTime
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
+
/**
* Note: If running directly from an IDE, these are the recommended VM parameters:
* -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
@@ -287,11 +288,11 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
val plan = res.queryExecution.sparkPlan
// filters should be pushed into phoenix relation
- assert(plan.toString.contains("PushedFilters: [IsNotNull(COL1), IsNotNull(ID), " +
- "EqualTo(COL1,test_row_1), EqualTo(ID,1)]"))
+ assert(plan.toString.contains("PushedFilters: [*IsNotNull(COL1), *IsNotNull(ID), " +
+ "*EqualTo(COL1,test_row_1), *EqualTo(ID,1)]"))
// spark should run the filters on the rows returned by Phoenix
- assert(!plan.toString.contains("Filter (((isnotnull(COL1#8) && isnotnull(ID#7L)) " +
- "&& (COL1#8 = test_row_1)) && (ID#7L = 1))"))
+ assert(!plan.toString.matches(".*Filter (((isnotnull(COL1.*) && isnotnull(ID.*)) "
+ + " && (COL1.* = test_row_1)) && (ID.* = 1)).*"))
}
test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {