You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/04/06 14:10:15 UTC

[phoenix-connectors] branch master updated: PHOENIX-6667 Spark3 connector requires that all columns are specified when writing

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 5abb5db  PHOENIX-6667 Spark3 connector requires that all columns are specified when writing
5abb5db is described below

commit 5abb5db59a7683f557528bd16e8c12f463e22f2a
Author: attilapiros <pi...@gmail.com>
AuthorDate: Wed Apr 5 16:32:36 2023 -0700

    PHOENIX-6667 Spark3 connector requires that all columns are specified when writing
    
    also fixes PHOENIX-6668 Spark3 connector cannot distinguish column name cases
    
    Co-authored-by: Istvan Toth <st...@apache.org>
---
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala  |  2 +-
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala  | 46 +++-------------------
 .../phoenix/spark/sql/connector/PhoenixTable.java  | 17 +++++---
 3 files changed, 18 insertions(+), 47 deletions(-)

diff --git a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index ebfd41c..464a588 100644
--- a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -363,7 +363,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     stringValue shouldEqual "test_row_1"
   }
 
-  test("Can save to phoenix table") {
+  test("Can save to phoenix table from Spark without specifying all the columns") {
     val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
 
     val schema = StructType(
diff --git a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 181b77d..85f590d 100644
--- a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -363,10 +363,10 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     stringValue shouldEqual "test_row_1"
   }
 
-  // This works with Spark2, but Spark3 enforces specifying every column
-  ignore("Can save to phoenix table from Spark2") {
+  test("Can save to phoenix table from Spark without specifying all the columns") {
     val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
 
+    // COL3 is missing both from the schema and from the dataset
     val schema = StructType(
       Seq(StructField("ID", LongType, nullable = false),
         StructField("COL1", StringType),
@@ -390,42 +390,10 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     while (rs.next()) {
       results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3)))
     }
-  }
-
-  test("Can save to phoenix table from Spark3") {
-    //We must specify every column for writing for Spark3
-    val dataSet = List(Row(1L, "1", 1, null), Row(2L, "2", 2, null), Row(3L, "3", 3, null))
-    //But partial reads are OK
-    val dataSetWoCol3 = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
-
-    val schema = StructType(
-      Seq(StructField("ID", LongType, nullable = false),
-        StructField("COL1", StringType),
-        StructField("COL2", IntegerType),
-        StructField("COL3", DateType)))
-
-    val rowRDD = spark.sparkContext.parallelize(dataSet)
-
-    // Apply the schema to the RDD.
-    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
-
-    df.write
-      .format("phoenix")
-      .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
-      .mode(SaveMode.Append)
-      .save()
-
-    // Load the results back
-    val stmt = conn.createStatement()
-    val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
-    val results = ListBuffer[Row]()
-    while (rs.next()) {
-      results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3)))
-    }
 
     // Verify they match
     (0 to results.size - 1).foreach { i =>
-      dataSetWoCol3(i) shouldEqual results(i)
+      dataSet(i) shouldEqual results(i)
     }
   }
 
@@ -435,15 +403,14 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
     val records = new mutable.MutableList[Row]
     for (x <- 1 to totalRecords) {
-      records += Row(x.toLong, x.toString, x, null)
+      records += Row(x.toLong, x.toString, x)
     }
     val dataSet = records.toList
 
     val schema = StructType(
       Seq(StructField("ID", LongType, nullable = false),
         StructField("COL1", StringType),
-        StructField("COL2", IntegerType),
-        StructField("COL3", DateType)))
+        StructField("COL2", IntegerType)))
 
     // Distribute the dataset into an RDD with just 1 partition so we use only 1 executor.
     // This makes it easy to deterministically count the batched commits from that executor
@@ -616,8 +583,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     count shouldEqual 1L
   }
 
-  //Spark3 doesn't seem to be able to handle case sensitive column names
-  ignore("Ensure DataFrame field normalization (PHOENIX-2196)") {
+  test("Ensure DataFrame field normalization (PHOENIX-2196)") {
     val rdd1 = spark.sparkContext
       .parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two")))
       .map(p => Row(p._1, p._2, p._3))
diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java
index b947771..fdd4217 100644
--- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java
+++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java
@@ -17,9 +17,16 @@
  */
 package org.apache.phoenix.spark.sql.connector;
 
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
+import static org.apache.spark.sql.connector.catalog.TableCapability.ACCEPT_ANY_SCHEMA;
+import static org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ;
+import static org.apache.spark.sql.connector.catalog.TableCapability.BATCH_WRITE;
+
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.phoenix.spark.sql.connector.reader.PhoenixScanBuilder;
 import org.apache.phoenix.spark.sql.connector.writer.PhoenixWriteBuilder;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.SupportsWrite;
 import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -29,16 +36,14 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-import java.util.Map;
-import java.util.Set;
-
 
 public class PhoenixTable implements SupportsRead, SupportsWrite{
 
     private final Map<String,String> options;
     private final String tableName;
     private final StructType schema;
-    private static final Set<TableCapability> capabilities = ImmutableSet.of(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE);
+    private static final Set<TableCapability> CAPABILITIES =
+      ImmutableSet.of(BATCH_READ, BATCH_WRITE, ACCEPT_ANY_SCHEMA);
 
     public PhoenixTable(StructType schema, Map<String,String> options) {
         this.options = options;
@@ -63,7 +68,7 @@ public class PhoenixTable implements SupportsRead, SupportsWrite{
 
     @Override
     public Set<TableCapability> capabilities() {
-        return capabilities;
+        return CAPABILITIES;
     }
 
     public Map<String, String> getOptions() {