You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/11/30 11:26:55 UTC

(phoenix-connectors) 02/07: PHOENIX-7065 Spark3 connector tests fail with Spark 3.4.1

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git

commit e7ad177c40f0c34e9120ddb13ac337486c282800
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Nov 23 21:06:13 2023 +0100

    PHOENIX-7065 Spark3 connector tests fail with Spark 3.4.1
    
    Fix test classpath issue
    use spark.hadoopRDD.ignoreEmptySplits=false for Spark3 tests
---
 phoenix5-spark/README.md                            | 18 +++++++++++++-----
 phoenix5-spark/pom.xml                              | 13 +++++++++++++
 .../org/apache/phoenix/spark/DataSourceApiIT.java   |  6 ++++--
 .../it/java/org/apache/phoenix/spark/SparkUtil.java |  3 ++-
 .../phoenix/spark/AbstractPhoenixSparkIT.scala      |  2 ++
 phoenix5-spark3-it/pom.xml                          | 15 +++++++++++++--
 .../phoenix/spark/AbstractPhoenixSparkIT.scala      |  1 +
 phoenix5-spark3/README.md                           | 21 ++++++++++++++++-----
 phoenix5-spark3/pom.xml                             |  4 ++--
 pom.xml                                             |  6 ++++--
 10 files changed, 70 insertions(+), 19 deletions(-)

diff --git a/phoenix5-spark/README.md b/phoenix5-spark/README.md
index f443cb0..73d68c2 100644
--- a/phoenix5-spark/README.md
+++ b/phoenix5-spark/README.md
@@ -38,6 +38,7 @@ val spark = SparkSession
   .builder()
   .appName("phoenix-test")
   .master("local")
+  .config("spark.hadoopRDD.ignoreEmptySplits", "false")
   .getOrCreate()
 
 // Load data from TABLE1
@@ -62,7 +63,8 @@ import org.apache.spark.sql.SQLContext;
 public class PhoenixSparkRead {
     
     public static void main() throws Exception {
-        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+            .set("spark.hadoopRDD.ignoreEmptySplits", "false");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         
@@ -109,6 +111,7 @@ val spark = SparkSession
   .builder()
   .appName("phoenix-test")
   .master("local")
+  .config("spark.hadoopRDD.ignoreEmptySplits", "false")
   .getOrCreate()
   
 // Load INPUT_TABLE
@@ -137,7 +140,8 @@ import org.apache.spark.sql.SQLContext;
 public class PhoenixSparkWriteFromInputTable {
     
     public static void main() throws Exception {
-        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+          .set("spark.hadoopRDD.ignoreEmptySplits", "false");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         
@@ -183,6 +187,7 @@ val spark = SparkSession
   .builder()
   .appName("phoenix-test")
   .master("local")
+  .config("spark.hadoopRDD.ignoreEmptySplits", "false")
   .getOrCreate()
   
 val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
@@ -223,7 +228,8 @@ import java.util.List;
 public class PhoenixSparkWriteFromRDDWithSchema {
  
     public static void main() throws Exception {
-        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+            .set("spark.hadoopRDD.ignoreEmptySplits", "false");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         SparkSession spark = sqlContext.sparkSession();
@@ -306,7 +312,8 @@ import org.apache.phoenix.spark._
 val configuration = new Configuration()
 // Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
 
-val sc = new SparkContext("local", "phoenix-test")
+val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+val sc = new SparkContext("local", "phoenix-test", sparkConf)
 val sqlContext = new SQLContext(sc)
 
 // Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
@@ -324,7 +331,8 @@ import org.apache.spark.sql.SQLContext
 import org.apache.phoenix.spark._
 import org.apache.spark.rdd.RDD
 
-val sc = new SparkContext("local", "phoenix-test")
+val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+val sc = new SparkContext("local", "phoenix-test", sparkConf)
 
 // Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
 val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
diff --git a/phoenix5-spark/pom.xml b/phoenix5-spark/pom.xml
index cd33ca3..78eedfa 100644
--- a/phoenix5-spark/pom.xml
+++ b/phoenix5-spark/pom.xml
@@ -36,6 +36,8 @@
   <properties>
     <top.dir>${project.basedir}/..</top.dir>
     <skip-scala-tests>true</skip-scala-tests>
+    <scala.version>${scala.version.for.spark2}</scala.version>
+    <scala.binary.version>${scala.binary.version.for.spark2}</scala.binary.version>
   </properties>
 
   <dependencies>
@@ -58,6 +60,17 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
+        <exclusions>
+        <!-- The shaded hadoop-client libraries conflict with hbase-shaded-mapreduce -->
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client-runtime</artifactId>
+        </exclusion>
+      </exclusions>
       <scope>provided</scope>
     </dependency>
     <dependency>
diff --git a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
index c6a4465..bc2637d 100644
--- a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
+++ b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
@@ -70,7 +70,8 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT {
 
     @Test
     public void basicWriteAndReadBackTest() throws SQLException {
-        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+                .set("spark.hadoopRDD.ignoreEmptySplits", "false");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         String tableName = generateUniqueName();
@@ -165,7 +166,8 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT {
     @Test
     @Ignore // Spark3 seems to be unable to handle mixed case colum names
     public void lowerCaseWriteTest() throws SQLException {
-        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+                .set("spark.hadoopRDD.ignoreEmptySplits", "false");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         String tableName = generateUniqueName();
diff --git a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
index 1c36f01..f00dadb 100644
--- a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
+++ b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
@@ -45,7 +45,8 @@ public class SparkUtil {
 
     public static SparkSession getSparkSession() {
         return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
-                .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate();
+                .config(UI_SHOW_CONSOLE_PROGRESS, false)
+                .config("spark.hadoopRDD.ignoreEmptySplits", false).getOrCreate();
     }
 
     public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config)
diff --git a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index a10d303..1b24403 100644
--- a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -103,6 +103,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
     // We pass in a TenantId to allow the DDL to create tenant-specific tables/views
     setupTables("tenantSetup.sql", Some(TenantId))
 
+    //FIXME is this ever used ?
     val conf = new SparkConf()
       .setAppName("PhoenixSparkIT")
       .setMaster("local[2]") // 2 threads, some parallelism
@@ -113,6 +114,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
       .appName("PhoenixSparkIT")
       .master("local[2]") // 2 threads, some parallelism
       .config("spark.ui.showConsoleProgress", "false")
+      .config("spark.hadoopRDD.ignoreEmptySplits", "false")
       .getOrCreate()
   }
 
diff --git a/phoenix5-spark3-it/pom.xml b/phoenix5-spark3-it/pom.xml
index 3d8c25b..5ce08df 100644
--- a/phoenix5-spark3-it/pom.xml
+++ b/phoenix5-spark3-it/pom.xml
@@ -35,8 +35,8 @@
   <properties>
     <top.dir>${project.basedir}/..</top.dir>
     <spark.version>${spark3.version}</spark.version>
-    <scala.version>2.12.10</scala.version>
-    <scala.binary.version>2.12</scala.binary.version>
+    <scala.version>${scala.version.for.spark3}</scala.version>
+    <scala.binary.version>${scala.binary.version.for.spark3}</scala.binary.version>
   </properties>
 
   <dependencies>
@@ -47,6 +47,17 @@
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
+      <exclusions>
+        <!-- The shaded hadoop-client libraries conflict with the minicluster -->
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client-runtime</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index 3308b82..12e679b 100644
--- a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -108,6 +108,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
       .appName("PhoenixSparkIT")
       .master("local[2]") // 2 threads, some parallelism
       .config("spark.ui.showConsoleProgress", "false")
+      .config("spark.hadoopRDD.ignoreEmptySplits", "false")
       .getOrCreate()
   }
 
diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md
index 824886d..ec7684a 100644
--- a/phoenix5-spark3/README.md
+++ b/phoenix5-spark3/README.md
@@ -48,6 +48,7 @@ val spark = SparkSession
   .builder()
   .appName("phoenix-test")
   .master("local")
+  .config("spark.hadoopRDD.ignoreEmptySplits", "false")
   .getOrCreate()
 
 // Load data from TABLE1
@@ -72,7 +73,8 @@ import org.apache.spark.sql.SQLContext;
 public class PhoenixSparkRead {
     
     public static void main() throws Exception {
-        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+            .set("spark.hadoopRDD.ignoreEmptySplits", "false");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         
@@ -119,6 +121,7 @@ val spark = SparkSession
   .builder()
   .appName("phoenix-test")
   .master("local")
+  .config("spark.hadoopRDD.ignoreEmptySplits", "false")
   .getOrCreate()
   
 // Load INPUT_TABLE
@@ -147,7 +150,8 @@ import org.apache.spark.sql.SQLContext;
 public class PhoenixSparkWriteFromInputTable {
     
     public static void main() throws Exception {
-        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+            .set("spark.hadoopRDD.ignoreEmptySplits", "false");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         
@@ -193,6 +197,7 @@ val spark = SparkSession
   .builder()
   .appName("phoenix-test")
   .master("local")
+  .config("spark.hadoopRDD.ignoreEmptySplits", "false")
   .getOrCreate()
   
 val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
@@ -233,7 +238,8 @@ import java.util.List;
 public class PhoenixSparkWriteFromRDDWithSchema {
  
     public static void main() throws Exception {
-        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+            .set("spark.hadoopRDD.ignoreEmptySplits", "false");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         SQLContext sqlContext = new SQLContext(jsc);
         SparkSession spark = sqlContext.sparkSession();
@@ -308,6 +314,7 @@ create the DataFrame or RDD directly if you need fine-grained configuration.
 ### Load as a DataFrame directly using a Configuration object
 ```scala
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
 import org.apache.phoenix.spark._
@@ -315,7 +322,8 @@ import org.apache.phoenix.spark._
 val configuration = new Configuration()
 // Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
 
-val sc = new SparkContext("local", "phoenix-test")
+val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+val sc = new SparkContext("local", "phoenix-test", sparkConf)
 val sqlContext = new SQLContext(sc)
 
 // Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
@@ -328,6 +336,7 @@ df.show
 
 ### Load as an RDD, using a Zookeeper URL
 ```scala
+import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
 import org.apache.phoenix.spark._
@@ -358,10 +367,12 @@ CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, co
 ```
 
 ```scala
+import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.phoenix.spark._
 
-val sc = new SparkContext("local", "phoenix-test")
+val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+val sc = new SparkContext("local", "phoenix-test", sparkConf)
 val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
 
 sc
diff --git a/phoenix5-spark3/pom.xml b/phoenix5-spark3/pom.xml
index 1b583e6..f2cec4a 100644
--- a/phoenix5-spark3/pom.xml
+++ b/phoenix5-spark3/pom.xml
@@ -35,8 +35,8 @@
   <properties>
     <top.dir>${project.basedir}/..</top.dir>
     <spark.version>${spark3.version}</spark.version>
-    <scala.version>2.12.10</scala.version>
-    <scala.binary.version>2.12</scala.binary.version>
+    <scala.version>${scala.version.for.spark3}</scala.version>
+    <scala.binary.version>${scala.binary.version.for.spark3}</scala.binary.version>
     <jodatime.version>2.10.5</jodatime.version>
   </properties>
 
diff --git a/pom.xml b/pom.xml
index b795ff9..5f796e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,9 +98,11 @@
     <hive3-storage.version>2.7.0</hive3-storage.version>
     <hive-storage.version>${hive3-storage.version}</hive-storage.version>
     <spark.version>2.4.0</spark.version>
+    <scala.version.for.spark2>2.11.12</scala.version.for.spark2>
+    <scala.binary.version.for.spark2>2.11</scala.binary.version.for.spark2>
     <spark3.version>3.0.3</spark3.version>
-    <scala.version>2.11.12</scala.version>
-    <scala.binary.version>2.11</scala.binary.version>
+    <scala.version.for.spark3>2.12.18</scala.version.for.spark3>
+    <scala.binary.version.for.spark3>2.12</scala.binary.version.for.spark3>
     
     <log4j.version>1.2.17</log4j.version>
     <log4j2.version>2.18.0</log4j2.version>