You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/11/30 11:26:55 UTC
(phoenix-connectors) 02/07: PHOENIX-7065 Spark3 connector tests fail with Spark 3.4.1
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
commit e7ad177c40f0c34e9120ddb13ac337486c282800
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Nov 23 21:06:13 2023 +0100
PHOENIX-7065 Spark3 connector tests fail with Spark 3.4.1
Fix test classpath issue
use spark.hadoopRDD.ignoreEmptySplits=false for Spark3 tests
---
phoenix5-spark/README.md | 18 +++++++++++++-----
phoenix5-spark/pom.xml | 13 +++++++++++++
.../org/apache/phoenix/spark/DataSourceApiIT.java | 6 ++++--
.../it/java/org/apache/phoenix/spark/SparkUtil.java | 3 ++-
.../phoenix/spark/AbstractPhoenixSparkIT.scala | 2 ++
phoenix5-spark3-it/pom.xml | 15 +++++++++++++--
.../phoenix/spark/AbstractPhoenixSparkIT.scala | 1 +
phoenix5-spark3/README.md | 21 ++++++++++++++++-----
phoenix5-spark3/pom.xml | 4 ++--
pom.xml | 6 ++++--
10 files changed, 70 insertions(+), 19 deletions(-)
diff --git a/phoenix5-spark/README.md b/phoenix5-spark/README.md
index f443cb0..73d68c2 100644
--- a/phoenix5-spark/README.md
+++ b/phoenix5-spark/README.md
@@ -38,6 +38,7 @@ val spark = SparkSession
.builder()
.appName("phoenix-test")
.master("local")
+ .config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()
// Load data from TABLE1
@@ -62,7 +63,8 @@ import org.apache.spark.sql.SQLContext;
public class PhoenixSparkRead {
public static void main() throws Exception {
- SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
@@ -109,6 +111,7 @@ val spark = SparkSession
.builder()
.appName("phoenix-test")
.master("local")
+ .config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()
// Load INPUT_TABLE
@@ -137,7 +140,8 @@ import org.apache.spark.sql.SQLContext;
public class PhoenixSparkWriteFromInputTable {
public static void main() throws Exception {
- SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
@@ -183,6 +187,7 @@ val spark = SparkSession
.builder()
.appName("phoenix-test")
.master("local")
+ .config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()
val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
@@ -223,7 +228,8 @@ import java.util.List;
public class PhoenixSparkWriteFromRDDWithSchema {
public static void main() throws Exception {
- SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
SparkSession spark = sqlContext.sparkSession();
@@ -306,7 +312,8 @@ import org.apache.phoenix.spark._
val configuration = new Configuration()
// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
-val sc = new SparkContext("local", "phoenix-test")
+val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+val sc = new SparkContext("local", "phoenix-test", sparkConf)
val sqlContext = new SQLContext(sc)
// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
@@ -324,7 +331,8 @@ import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
import org.apache.spark.rdd.RDD
-val sc = new SparkContext("local", "phoenix-test")
+val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+val sc = new SparkContext("local", "phoenix-test", sparkConf)
// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
diff --git a/phoenix5-spark/pom.xml b/phoenix5-spark/pom.xml
index cd33ca3..78eedfa 100644
--- a/phoenix5-spark/pom.xml
+++ b/phoenix5-spark/pom.xml
@@ -36,6 +36,8 @@
<properties>
<top.dir>${project.basedir}/..</top.dir>
<skip-scala-tests>true</skip-scala-tests>
+ <scala.version>${scala.version.for.spark2}</scala.version>
+ <scala.binary.version>${scala.binary.version.for.spark2}</scala.binary.version>
</properties>
<dependencies>
@@ -58,6 +60,17 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
+ <exclusions>
+ <!-- The shaded hadoop-client libraries conflict with hbase-shaded-mapreduce -->
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ </exclusion>
+ </exclusions>
<scope>provided</scope>
</dependency>
<dependency>
diff --git a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
index c6a4465..bc2637d 100644
--- a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
+++ b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
@@ -70,7 +70,8 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT {
@Test
public void basicWriteAndReadBackTest() throws SQLException {
- SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
String tableName = generateUniqueName();
@@ -165,7 +166,8 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT {
@Test
@Ignore // Spark3 seems to be unable to handle mixed case colum names
public void lowerCaseWriteTest() throws SQLException {
- SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
String tableName = generateUniqueName();
diff --git a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
index 1c36f01..f00dadb 100644
--- a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
+++ b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
@@ -45,7 +45,8 @@ public class SparkUtil {
public static SparkSession getSparkSession() {
return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
- .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate();
+ .config(UI_SHOW_CONSOLE_PROGRESS, false)
+ .config("spark.hadoopRDD.ignoreEmptySplits", false).getOrCreate();
}
public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config)
diff --git a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index a10d303..1b24403 100644
--- a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -103,6 +103,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
// We pass in a TenantId to allow the DDL to create tenant-specific tables/views
setupTables("tenantSetup.sql", Some(TenantId))
+ //FIXME is this ever used ?
val conf = new SparkConf()
.setAppName("PhoenixSparkIT")
.setMaster("local[2]") // 2 threads, some parallelism
@@ -113,6 +114,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
.appName("PhoenixSparkIT")
.master("local[2]") // 2 threads, some parallelism
.config("spark.ui.showConsoleProgress", "false")
+ .config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()
}
diff --git a/phoenix5-spark3-it/pom.xml b/phoenix5-spark3-it/pom.xml
index 3d8c25b..5ce08df 100644
--- a/phoenix5-spark3-it/pom.xml
+++ b/phoenix5-spark3-it/pom.xml
@@ -35,8 +35,8 @@
<properties>
<top.dir>${project.basedir}/..</top.dir>
<spark.version>${spark3.version}</spark.version>
- <scala.version>2.12.10</scala.version>
- <scala.binary.version>2.12</scala.binary.version>
+ <scala.version>${scala.version.for.spark3}</scala.version>
+ <scala.binary.version>${scala.binary.version.for.spark3}</scala.binary.version>
</properties>
<dependencies>
@@ -47,6 +47,17 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <!-- The shaded hadoop-client libraries conflict with the minicluster -->
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index 3308b82..12e679b 100644
--- a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -108,6 +108,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
.appName("PhoenixSparkIT")
.master("local[2]") // 2 threads, some parallelism
.config("spark.ui.showConsoleProgress", "false")
+ .config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()
}
diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md
index 824886d..ec7684a 100644
--- a/phoenix5-spark3/README.md
+++ b/phoenix5-spark3/README.md
@@ -48,6 +48,7 @@ val spark = SparkSession
.builder()
.appName("phoenix-test")
.master("local")
+ .config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()
// Load data from TABLE1
@@ -72,7 +73,8 @@ import org.apache.spark.sql.SQLContext;
public class PhoenixSparkRead {
public static void main() throws Exception {
- SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
@@ -119,6 +121,7 @@ val spark = SparkSession
.builder()
.appName("phoenix-test")
.master("local")
+ .config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()
// Load INPUT_TABLE
@@ -147,7 +150,8 @@ import org.apache.spark.sql.SQLContext;
public class PhoenixSparkWriteFromInputTable {
public static void main() throws Exception {
- SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
@@ -193,6 +197,7 @@ val spark = SparkSession
.builder()
.appName("phoenix-test")
.master("local")
+ .config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()
val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
@@ -233,7 +238,8 @@ import java.util.List;
public class PhoenixSparkWriteFromRDDWithSchema {
public static void main() throws Exception {
- SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
+ .set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
SparkSession spark = sqlContext.sparkSession();
@@ -308,6 +314,7 @@ create the DataFrame or RDD directly if you need fine-grained configuration.
### Load as a DataFrame directly using a Configuration object
```scala
import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
@@ -315,7 +322,8 @@ import org.apache.phoenix.spark._
val configuration = new Configuration()
// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
-val sc = new SparkContext("local", "phoenix-test")
+val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+val sc = new SparkContext("local", "phoenix-test", sparkConf)
val sqlContext = new SQLContext(sc)
// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
@@ -328,6 +336,7 @@ df.show
### Load as an RDD, using a Zookeeper URL
```scala
+import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
@@ -358,10 +367,12 @@ CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, co
```
```scala
+import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.phoenix.spark._
-val sc = new SparkContext("local", "phoenix-test")
+val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+val sc = new SparkContext("local", "phoenix-test", sparkConf)
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
sc
diff --git a/phoenix5-spark3/pom.xml b/phoenix5-spark3/pom.xml
index 1b583e6..f2cec4a 100644
--- a/phoenix5-spark3/pom.xml
+++ b/phoenix5-spark3/pom.xml
@@ -35,8 +35,8 @@
<properties>
<top.dir>${project.basedir}/..</top.dir>
<spark.version>${spark3.version}</spark.version>
- <scala.version>2.12.10</scala.version>
- <scala.binary.version>2.12</scala.binary.version>
+ <scala.version>${scala.version.for.spark3}</scala.version>
+ <scala.binary.version>${scala.binary.version.for.spark3}</scala.binary.version>
<jodatime.version>2.10.5</jodatime.version>
</properties>
diff --git a/pom.xml b/pom.xml
index b795ff9..5f796e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,9 +98,11 @@
<hive3-storage.version>2.7.0</hive3-storage.version>
<hive-storage.version>${hive3-storage.version}</hive-storage.version>
<spark.version>2.4.0</spark.version>
+ <scala.version.for.spark2>2.11.12</scala.version.for.spark2>
+ <scala.binary.version.for.spark2>2.11</scala.binary.version.for.spark2>
<spark3.version>3.0.3</spark3.version>
- <scala.version>2.11.12</scala.version>
- <scala.binary.version>2.11</scala.binary.version>
+ <scala.version.for.spark3>2.12.18</scala.version.for.spark3>
+ <scala.binary.version.for.spark3>2.12</scala.binary.version.for.spark3>
<log4j.version>1.2.17</log4j.version>
<log4j2.version>2.18.0</log4j2.version>