You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/02 11:09:59 UTC

[GitHub] [hudi] codope commented on a diff in pull request #6550: [HUDI-4691] Cleaning up duplicated classes in Spark 3.3 module

codope commented on code in PR #6550:
URL: https://github.com/apache/hudi/pull/6550#discussion_r961525890


##########
hudi-spark-datasource/hudi-spark/pom.xml:
##########
@@ -203,41 +207,6 @@
       <artifactId>hudi-sync-common</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.curator</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hudi</groupId>
-      <artifactId>${hudi.spark.module}_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hudi</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hudi</groupId>
-      <artifactId>${hudi.spark.common.module}</artifactId>

Review Comment:
   so we don't need spark common module at all in hudi-spark-datasource? Was it redundant earlier?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -94,10 +90,10 @@ object HoodieAnalysis {
       //
       // It's critical for this rules to follow in this order, so that DataSource V2 to V1 fallback
       // is performed prior to other rules being evaluated
-      rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, resolveAlterTableCommands)
+      rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, resolveAlterTableCommands)
 
     } else if (HoodieSparkUtils.gteqSpark3_1) {
-      val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.Spark312ResolveHudiAlterTableCommand"
+      val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.Spark31ResolveHudiAlterTableCommand"

Review Comment:
   Is this a problem if not all writers and readers use the same Hudi version? Maybe not in this case, but just calling it out to think through the change of class names. I had encountered one issue while doing HBase upgrade (however that was because we actually wrote the kv comparator class name in data files).



##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_1AvroDeserializer.scala:
##########
@@ -18,12 +18,19 @@
 package org.apache.spark.sql.avro
 
 import org.apache.avro.Schema
+import org.apache.spark.sql.catalyst.NoopFilters
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types.DataType
 
 class HoodieSpark3_1AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType)
   extends HoodieAvroDeserializer {
 
-  private val avroDeserializer = new AvroDeserializer(rootAvroType, rootCatalystType)
+  private val avroDeserializer = {
+    val avroRebaseModeInRead = LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))

Review Comment:
   What's the significance of this config? Please add a comment.



##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalogUtils.scala:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
+
+trait HoodieSpark3CatalogUtils extends HoodieCatalogUtils {
+
+  /**
+   * Decomposes [[org.apache.spark.sql.connector.expressions.BucketTransform]] extracting its
+   * arguments to accommodate for API changes in Spark 3.3 returning:
+   *
+   * <ol>
+   *   <li>Number of the buckets</li>
+   *   <li>Seq of references (to be bucketed by)</li>
+   *   <li>Seq of sorted references</li>
+   * </ol>
+   */
+  def unapplyBucketTransform(t: Transform): Option[(Int, Seq[NamedReference], Seq[NamedReference])]
+

Review Comment:
   nit: extra newline



##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala:
##########
@@ -52,7 +52,7 @@ import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 // TODO: we should remove this file when we support datasourceV2 for hoodie on spark3.1x
-case class AlterTableCommand312(table: CatalogTable, changes: Seq[TableChange], changeType: ColumnChangeID) extends RunnableCommand with Logging {
+case class Spark31AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], changeType: ColumnChangeID) extends RunnableCommand with Logging {

Review Comment:
   I see that the refactoring has been done with the assumption that Spark won't break things between patch versions. I think it's a fair assumption. However, it cannot be guaranteed. Just something to be cautious about in future.



##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/hudi/Spark32PlusDefaultSource.scala:
##########
@@ -25,7 +25,7 @@ import org.apache.spark.sql.sources.DataSourceRegister
  *       there are no regressions in performance
  *       Please check out HUDI-4178 for more details
  */
-class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {
+class Spark32PlusDefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {

Review Comment:
   can remove all the commented part in this class



##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalogUtils.scala:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
+
+trait HoodieSpark3CatalogUtils extends HoodieCatalogUtils {
+
+  /**
+   * Decomposes [[org.apache.spark.sql.connector.expressions.BucketTransform]] extracting its
+   * arguments to accommodate for API changes in Spark 3.3 returning:
+   *
+   * <ol>
+   *   <li>Number of the buckets</li>
+   *   <li>Seq of references (to be bucketed by)</li>
+   *   <li>Seq of sorted references</li>
+   * </ol>
+   */
+  def unapplyBucketTransform(t: Transform): Option[(Int, Seq[NamedReference], Seq[NamedReference])]
+
+}
+
+object HoodieSpark3CatalogUtils extends SparkAdapterSupport {
+
+  object MatchBucketTransform {
+    def unapply(t: Transform): Option[(Int, Seq[NamedReference], Seq[NamedReference])] =
+      sparkAdapter.getCatalogUtils.asInstanceOf[HoodieSpark3CatalogUtils]
+        .unapplyBucketTransform(t)
+  }
+

Review Comment:
   nit: extra newline



##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalogUtils.scala:
##########
@@ -15,19 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.expressions.{BucketTransform, NamedReference, Transform}
 
-case class TimeTravelRelation(
-                               table: LogicalPlan,
-                               timestamp: Option[Expression],
-                               version: Option[String]) extends Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
+object HoodieSpark31CatalogUtils extends HoodieSpark3CatalogUtils {
 
-  override def output: Seq[Attribute] = Nil
+  override def unapplyBucketTransform(t: Transform): Option[(Int, Seq[NamedReference], Seq[NamedReference])] =
+    t match {
+      case BucketTransform(numBuckets, ref) => Some(numBuckets, Seq(ref), Seq.empty)

Review Comment:
   Why sorted refs is an empty sequence? Did BucketTransform not support sorted refs argument in Spark 3.1 or 3.2?



##########
hudi-spark-datasource/hudi-spark3.2plus-common/pom.xml:
##########
@@ -0,0 +1,234 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hudi-spark-datasource</artifactId>
+        <groupId>org.apache.hudi</groupId>
+        <version>0.13.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hudi-spark3.2plus-common</artifactId>

Review Comment:
   Renaming should be fine from usability perspective as it is the common module which is not downloaded directly. I don't see a way out really unless Spark stops doing breaking changes between minor versions! 
   We should just ensure that bundle packages the right spark common module.



##########
hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala:
##########
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
-import org.apache.spark.util.Utils
-
-object Spark32DataSourceUtils {

Review Comment:
   Perhaps due to removal of this, we need that Legacy behavior policy in avro deserializer. Would be good to move some of the comments there.



##########
pom.xml:
##########
@@ -1938,7 +1910,8 @@
         <scala.version>${scala12.version}</scala.version>
         <scala.binary.version>2.12</scala.binary.version>
         <hudi.spark.module>hudi-spark3.2.x</hudi.spark.module>
-        <hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
+        <!-- This glob has to include hudi-spark3-common, hudi-spark3.2plus-common -->
+        <hudi.spark.common.modules.glob>hudi-spark3*-common</hudi.spark.common.modules.glob>

Review Comment:
   Does it make sense to avoid glob pattern and define separate parameter like `hudi.spark2.common.module`, `hudi.spark3.common.module`, `hudi.spark32plus.common.module`, `hudi.spark33plus.common.module` (in future)? It makes it easier to cherry-pick, otherwise we need to maintain glob pattern every so often.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org