You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2022/04/07 15:33:26 UTC
[phoenix-connectors] branch master updated: PHOENIX-6683 Surround the OR filters with parentheses while convertin… (#73)
This is an automated email from the ASF dual-hosted git repository.
rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 1cc123b PHOENIX-6683 Surround the OR filters with parentheses while convertin… (#73)
1cc123b is described below
commit 1cc123bd57ed51b40a3ad9edaac9a815b2bf4a32
Author: Rajeshbabu Chintaguntla <ch...@gmail.com>
AuthorDate: Thu Apr 7 21:03:21 2022 +0530
PHOENIX-6683 Surround the OR filters with parentheses while convertin… (#73)
---
.../java/org/apache/phoenix/spark/OrderByIT.java | 67 ++++++++++++++++++++++
.../phoenix/spark/FilterExpressionCompiler.scala | 2 +-
.../java/org/apache/phoenix/spark/OrderByIT.java | 67 ++++++++++++++++++++++
.../phoenix/spark/FilterExpressionCompiler.scala | 2 +-
4 files changed, 136 insertions(+), 2 deletions(-)
diff --git a/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index 664bc94..043bb32 100644
--- a/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -282,6 +283,72 @@ public class OrderByIT extends BaseOrderByIT {
}
}
+ @Test
+ public void testCombinationOfOrAndFilters() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String tableName1 = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName1 +
+ " (ENTITY_INSTANCE_ID BIGINT NOT NULL, MEMBER_ID VARCHAR(50) NOT NULL, CASE_ID VARCHAR(250) NOT NULL," +
+ " CANCELLATION_FLAG VARCHAR(1), CASE_MATCH_TYPE CHAR(1), CONSTRAINT PK1 PRIMARY KEY(ENTITY_INSTANCE_ID, " +
+ "MEMBER_ID, CASE_ID))\n";
+ createTestTable(getUrl(), ddl);
+ SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+ Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, tableName1)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName1);
+ String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setInt(1, 40);
+ stmt.setString(2, "a");
+ stmt.setString(3, "b");
+ stmt.setString(4, "Y");
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 40);
+ stmt.setString(2, "c");
+ stmt.setString(3, "d");
+ stmt.setNull(4, Types.VARCHAR);
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 40);
+ stmt.setString(2, "e");
+ stmt.setString(3, "f");
+ stmt.setString(4, "N");
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 41);
+ stmt.setString(2, "f");
+ stmt.setString(3, "g");
+ stmt.setString(4, "N");
+ stmt.setString(5, "C");
+ stmt.execute();
+ conn.commit();
+ String query =
+ "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and " +
+ "(CANCELLATION_FLAG <> 'Y' OR CANCELLATION_FLAG IS NULL ) and CASE_MATCH_TYPE='M'";
+ Dataset<Row> dataset =
+ sqlContext.sql(query);
+ List<Row> rows = dataset.collectAsList();
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+ query =
+ "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and CASE_MATCH_TYPE='M' " +
+ " OR CANCELLATION_FLAG <> 'Y'";
+ dataset =
+ sqlContext.sql(query);
+ rows = dataset.collectAsList();
+ rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(1));
+ assertFalse(rs.next());
+ }
+ }
+
@Test
public void testOrderByWithExpression() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
diff --git a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index a2ec2dc..111f021 100644
--- a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -66,7 +66,7 @@ class FilterExpressionCompiler() {
val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
- filter.append(whereLeftClause + " OR " + whereRightClause)
+ filter.append("(" + whereLeftClause + " OR " + whereRightClause + ")")
}
else {
unsupportedFilters :+ f
diff --git a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index 0b51436..e7aa424 100644
--- a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -283,6 +284,72 @@ public class OrderByIT extends BaseOrderByIT {
}
}
+ @Test
+ public void testCombinationOfOrAndFilters() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String tableName1 = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName1 +
+ " (ENTITY_INSTANCE_ID BIGINT NOT NULL, MEMBER_ID VARCHAR(50) NOT NULL, CASE_ID VARCHAR(250) NOT NULL," +
+ " CANCELLATION_FLAG VARCHAR(1), CASE_MATCH_TYPE CHAR(1), CONSTRAINT PK1 PRIMARY KEY(ENTITY_INSTANCE_ID, " +
+ "MEMBER_ID, CASE_ID))\n";
+ createTestTable(getUrl(), ddl);
+ SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+ Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option("table", tableName1)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName1);
+ String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setInt(1, 40);
+ stmt.setString(2, "a");
+ stmt.setString(3, "b");
+ stmt.setString(4, "Y");
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 40);
+ stmt.setString(2, "c");
+ stmt.setString(3, "d");
+ stmt.setNull(4, Types.VARCHAR);
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 40);
+ stmt.setString(2, "e");
+ stmt.setString(3, "f");
+ stmt.setString(4, "N");
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 41);
+ stmt.setString(2, "f");
+ stmt.setString(3, "g");
+ stmt.setString(4, "N");
+ stmt.setString(5, "C");
+ stmt.execute();
+ conn.commit();
+ String query =
+ "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and " +
+ "(CANCELLATION_FLAG <> 'Y' OR CANCELLATION_FLAG IS NULL ) and CASE_MATCH_TYPE='M'";
+ Dataset<Row> dataset =
+ sqlContext.sql(query);
+ List<Row> rows = dataset.collectAsList();
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+ query =
+ "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and CASE_MATCH_TYPE='M' " +
+ " OR CANCELLATION_FLAG <> 'Y'";
+ dataset =
+ sqlContext.sql(query);
+ rows = dataset.collectAsList();
+ rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(1));
+ assertFalse(rs.next());
+ }
+ }
+
@Test
public void testOrderByWithExpression() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
diff --git a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index 8b9b481..77c172a 100644
--- a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -66,7 +66,7 @@ class FilterExpressionCompiler() {
val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
- filter.append(whereLeftClause + " OR " + whereRightClause)
+ filter.append("(" + whereLeftClause + " OR " + whereRightClause + ")")
}
else {
unsupportedFilters :+ f