You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/29 23:11:12 UTC

[GitHub] [hudi] umehrot2 commented on a change in pull request #2208: [HUDI-1040] Make Hudi support Spark 3

umehrot2 commented on a change in pull request #2208:
URL: https://github.com/apache/hudi/pull/2208#discussion_r514596118



##########
File path: packaging/hudi-utilities-bundle/pom.xml
##########
@@ -105,6 +106,7 @@
                   <include>io.prometheus:simpleclient_common</include>
                   <include>com.yammer.metrics:metrics-core</include>
                   <include>org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version}</include>
+                  <include>org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version}</include>

Review comment:
       For my understanding, why is this needed ?

##########
File path: LICENSE
##########
@@ -246,6 +246,8 @@ This product includes code from Apache Spark
 
 * org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package
 
+* org.apache.hudi.HoodieSparkUtils.scala copied from org.apache.spark.deploy.SparkHadoopUtil.scala

Review comment:
       Perhaps we can be more specific that we `copied some methods` ?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -121,6 +122,9 @@ private[hudi] object HoodieSparkSqlWriter {
       // short-circuit if bulk_insert via row is enabled.
       // scalastyle:off
       if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean) {
+        if (SPARK_VERSION.startsWith("3.")) {
+          throw new HoodieException("Bulk insert via row is not compatible with Spark 3, it is only compatible with Spark 2!")
+        }

Review comment:
       Is this not possible through delta streamer ? Seems like not.

##########
File path: pom.xml
##########
@@ -100,6 +104,7 @@
     <prometheus.version>0.8.0</prometheus.version>
     <http.version>4.4.1</http.version>
     <spark.version>2.4.4</spark.version>
+    <spark2.version>2.4.4</spark2.version>

Review comment:
       I would suggest just keeping `spark.version` here. Override the `spark.version` respectively in `hudi-spark2` and `hudi-spark3` modules.

##########
File path: hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
##########
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.internal;
 
-import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.DataSourceUtilsForSpark2;

Review comment:
       What is the need to move the `hudi datasource` itself to `hudi-spark2` ? I think we should leave it under `hudi-spark` and later if we want to have separate datasource implementations we can create separately under `hudi-spark2` and `hudi-spark3` modules. Thoughts ?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
##########
@@ -85,7 +85,7 @@
         .collect(Collectors.toList()));
 
     return inputPaths.stream().map(path -> {
-      setInputPath(jobConf, path);
+      FileInputFormat.setInputPaths(jobConf, path);

Review comment:
       As discussed internally regarding this in the code review, can you confirm if this is actually converting paths to point to local file system and not HDFS ? Also would be good to explain why you did this for reference in the description.

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -96,4 +98,16 @@ object AvroConversionUtils {
     val name = HoodieAvroUtils.sanitizeName(tableName)
     (s"${name}_record", s"hoodie.${name}")
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // TODO remove reflection if Spark 2.x support is dropped
+    if (SPARK_VERSION.startsWith("2.")) {

Review comment:
       +1 Lets have two separate implementations of the Row Deserializer for spark 2 and spark 3, as was done in https://github.com/apache/hudi/pull/1760/files




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org