You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/04/06 14:10:15 UTC
[phoenix-connectors] branch master updated: PHOENIX-6667 Spark3 connector requires that all columns are specified when writing
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 5abb5db PHOENIX-6667 Spark3 connector requires that all columns are specified when writing
5abb5db is described below
commit 5abb5db59a7683f557528bd16e8c12f463e22f2a
Author: attilapiros <pi...@gmail.com>
AuthorDate: Wed Apr 5 16:32:36 2023 -0700
PHOENIX-6667 Spark3 connector requires that all columns are specified when writing
also fixes PHOENIX-6668 Spark3 connector cannot distinguish column name cases
Co-authored-by: Istvan Toth <st...@apache.org>
---
.../org/apache/phoenix/spark/PhoenixSparkIT.scala | 2 +-
.../org/apache/phoenix/spark/PhoenixSparkIT.scala | 46 +++-------------------
.../phoenix/spark/sql/connector/PhoenixTable.java | 17 +++++---
3 files changed, 18 insertions(+), 47 deletions(-)
diff --git a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index ebfd41c..464a588 100644
--- a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -363,7 +363,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
stringValue shouldEqual "test_row_1"
}
- test("Can save to phoenix table") {
+ test("Can save to phoenix table from Spark without specifying all the columns") {
val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
val schema = StructType(
diff --git a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 181b77d..85f590d 100644
--- a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -363,10 +363,10 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
stringValue shouldEqual "test_row_1"
}
- // This works with Spark2, but Spark3 enforces specifying every column
- ignore("Can save to phoenix table from Spark2") {
+ test("Can save to phoenix table from Spark without specifying all the columns") {
val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
+ // COL3 is missing both from the schema and from the dataset
val schema = StructType(
Seq(StructField("ID", LongType, nullable = false),
StructField("COL1", StringType),
@@ -390,42 +390,10 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
while (rs.next()) {
results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3)))
}
- }
-
- test("Can save to phoenix table from Spark3") {
- //We must specify every column for writing for Spark3
- val dataSet = List(Row(1L, "1", 1, null), Row(2L, "2", 2, null), Row(3L, "3", 3, null))
- //But partial reads are OK
- val dataSetWoCol3 = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
-
- val schema = StructType(
- Seq(StructField("ID", LongType, nullable = false),
- StructField("COL1", StringType),
- StructField("COL2", IntegerType),
- StructField("COL3", DateType)))
-
- val rowRDD = spark.sparkContext.parallelize(dataSet)
-
- // Apply the schema to the RDD.
- val df = spark.sqlContext.createDataFrame(rowRDD, schema)
-
- df.write
- .format("phoenix")
- .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
- .mode(SaveMode.Append)
- .save()
-
- // Load the results back
- val stmt = conn.createStatement()
- val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
- val results = ListBuffer[Row]()
- while (rs.next()) {
- results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3)))
- }
// Verify they match
(0 to results.size - 1).foreach { i =>
- dataSetWoCol3(i) shouldEqual results(i)
+ dataSet(i) shouldEqual results(i)
}
}
@@ -435,15 +403,14 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
val records = new mutable.MutableList[Row]
for (x <- 1 to totalRecords) {
- records += Row(x.toLong, x.toString, x, null)
+ records += Row(x.toLong, x.toString, x)
}
val dataSet = records.toList
val schema = StructType(
Seq(StructField("ID", LongType, nullable = false),
StructField("COL1", StringType),
- StructField("COL2", IntegerType),
- StructField("COL3", DateType)))
+ StructField("COL2", IntegerType)))
// Distribute the dataset into an RDD with just 1 partition so we use only 1 executor.
// This makes it easy to deterministically count the batched commits from that executor
@@ -616,8 +583,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
count shouldEqual 1L
}
- //Spark3 doesn't seem to be able to handle case sensitive column names
- ignore("Ensure DataFrame field normalization (PHOENIX-2196)") {
+ test("Ensure DataFrame field normalization (PHOENIX-2196)") {
val rdd1 = spark.sparkContext
.parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two")))
.map(p => Row(p._1, p._2, p._3))
diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java
index b947771..fdd4217 100644
--- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java
+++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java
@@ -17,9 +17,16 @@
*/
package org.apache.phoenix.spark.sql.connector;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
+import static org.apache.spark.sql.connector.catalog.TableCapability.ACCEPT_ANY_SCHEMA;
+import static org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ;
+import static org.apache.spark.sql.connector.catalog.TableCapability.BATCH_WRITE;
+
+import java.util.Map;
+import java.util.Set;
+
import org.apache.phoenix.spark.sql.connector.reader.PhoenixScanBuilder;
import org.apache.phoenix.spark.sql.connector.writer.PhoenixWriteBuilder;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -29,16 +36,14 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-import java.util.Map;
-import java.util.Set;
-
public class PhoenixTable implements SupportsRead, SupportsWrite{
private final Map<String,String> options;
private final String tableName;
private final StructType schema;
- private static final Set<TableCapability> capabilities = ImmutableSet.of(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE);
+ private static final Set<TableCapability> CAPABILITIES =
+ ImmutableSet.of(BATCH_READ, BATCH_WRITE, ACCEPT_ANY_SCHEMA);
public PhoenixTable(StructType schema, Map<String,String> options) {
this.options = options;
@@ -63,7 +68,7 @@ public class PhoenixTable implements SupportsRead, SupportsWrite{
@Override
public Set<TableCapability> capabilities() {
- return capabilities;
+ return CAPABILITIES;
}
public Map<String, String> getOptions() {