You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/25 23:49:02 UTC

[GitHub] [hudi] yihua commented on a diff in pull request #5943: [HUDI-4186] Support Hudi with Spark 3.3.0

yihua commented on code in PR #5943:
URL: https://github.com/apache/hudi/pull/5943#discussion_r929309926


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -138,4 +139,30 @@ trait SparkAdapter extends Serializable {
    * TODO move to HoodieCatalystExpressionUtils
    */
   def createInterpretedPredicate(e: Expression): InterpretedPredicate
+
+  /**
+   * Create instance of [[HoodieFileScanRDD]]
+   * SPARK-37273 FileScanRDD constructor changed in SPARK 3.3
+   */
+  def createHoodieFileScanRDD(sparkSession: SparkSession,
+                              readFunction: PartitionedFile => Iterator[InternalRow],
+                              filePartitions: Seq[FilePartition],
+                              readDataSchema: StructType,
+                              metadataColumns: Seq[AttributeReference] = Seq.empty): FileScanRDD
+
+  /**
+   * Resolve [[DeleteFromTable]]
+   * SPARK-38626 condition is no longer Option in Spark 3.3
+   */
+  def resolveDeleteFromTable(dft: Command,
+                             resolveExpression: Expression => Expression): LogicalPlan
+
+  /**
+   * Get parseQuery from ExtendedSqlParser, only for Spark 3.3+
+   */
+  def getQueryParserFromExtendedSqlParser(session: SparkSession, delegate: ParserInterface,
+                                          sqlText: String): LogicalPlan = {
+    // unsupported by default
+    throw new UnsupportedOperationException(s"Unsupported parseQuery method in Spark earlier than Spark 3.3.0")

Review Comment:
   To clarify, I see that `HoodieCommonSqlParser` uses this API now.  Does it affect SQL in pre-Spark 3.3 versions?  If not, how is it realized?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkAdapterSupport.scala:
##########
@@ -27,7 +27,9 @@ import org.apache.spark.sql.hudi.SparkAdapter
 trait SparkAdapterSupport {
 
   lazy val sparkAdapter: SparkAdapter = {
-    val adapterClass = if (HoodieSparkUtils.isSpark3_2) {
+    val adapterClass =  if (HoodieSparkUtils.gteqSpark3_3_0) {

Review Comment:
   Similar to above, is `gteqSpark3_3` good enough here?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java:
##########
@@ -71,11 +72,20 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair
   }
 
   private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
-    MessageType parquetSchema = new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath);
+    Configuration hadoopConf = context.getHadoopConf().get();
+    MessageType parquetSchema = new ParquetUtils().readSchema(hadoopConf, filePath);
+
+    hadoopConf.set(
+        SQLConf.PARQUET_BINARY_AS_STRING().key(),
+        SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
+    hadoopConf.set(
+        SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
+        SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString());
+    hadoopConf.set(
+        SQLConf.CASE_SENSITIVE().key(),
+        SQLConf.CASE_SENSITIVE().defaultValueString());
+    ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(hadoopConf);

Review Comment:
   nit: For such BWC logic, should we just create a new method in the SparkAdapter for different Spark versions?  Then the code and logic for existing supported Spark versions are not changed.  I'm a little paranoid about changes to existing Spark versions which have the risk of breaking things that are hard to track.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -52,11 +52,14 @@ private[hudi] trait SparkVersionsSupport {
   def isSpark3_0: Boolean = getSparkVersion.startsWith("3.0")
   def isSpark3_1: Boolean = getSparkVersion.startsWith("3.1")
   def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")
+  def isSpark3_3: Boolean = getSparkVersion.startsWith("3.3")
 
   def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
   def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3"
   def gteqSpark3_2: Boolean = getSparkVersion >= "3.2"
   def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1"
+  def gteqSpark3_3: Boolean = getSparkVersion >= "3.3"
+  def gteqSpark3_3_0: Boolean = getSparkVersion >= "3.3.0"

Review Comment:
   We don't need the patch version level check for Spark 3.3 at this point, right?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -91,7 +93,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
       hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
     )
 
-    new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
+    // SPARK-37273 FileScanRDD constructor changed in SPARK 3.3

Review Comment:
   @CTTY is this resolved?



##########
hudi-spark-datasource/README.md:
##########
@@ -21,17 +21,19 @@ This repo contains the code that integrate Hudi with Spark. The repo is split in
 
 `hudi-spark`
 `hudi-spark2`
-`hudi-spark3`
 `hudi-spark3.1.x`
+`hudi-spark3.2.x`
+`hudi-spark3.3.x`

Review Comment:
   nit: We should have `hudi-spark3.1`, `hudi-spark3.2`, and `hudi-spark3.3` as shorter names.  @CTTY Could you create a ticket to track this? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java:
##########
@@ -130,6 +130,12 @@ public class HoodieStorageConfig extends HoodieConfig {
       .defaultValue("TIMESTAMP_MICROS")
       .withDocumentation("Sets spark.sql.parquet.outputTimestampType. Parquet timestamp type to use when Spark writes data to Parquet files.");
 
+  public static final ConfigProperty<String> PARQUET_FIELD_ID_WRITE_ENABLED = ConfigProperty
+      .key("hoodie.parquet.fieldId.write.enabled")
+      .defaultValue("true")
+      .withDocumentation("Sets spark.sql.parquet.fieldId.write.enabled. "
+          + "If enabled, Spark will write out parquet native field ids that are stored inside StructField's metadata as parquet.field.id to parquet files.");
+

Review Comment:
   @CTTY could you help me understand where this check is enforced?  From the Spark docs, I see that the default value of `spark.sql.parquet.fieldId.write.enabled` is true and I suppose we don't have to set it explicitly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org