You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/04/22 00:36:52 UTC

[hudi] 05/15: Fixed instantiation of the components t/h reflection

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.11-0-apr21-5378-patched
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bc58383dd1232e8957427b89aaf3f5a83fe17b62
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 20 15:20:40 2022 -0700

    Fixed instantiation of the components t/h reflection
---
 .../apache/hudi/common/util/ReflectionUtils.java   |  4 +--
 .../parquet/Spark312HoodieParquetFileFormat.scala  |  8 +++---
 .../parquet/Spark32HoodieParquetFileFormat.scala   | 33 ++++++++++------------
 3 files changed, 21 insertions(+), 24 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index ec361d9f9a..13228c440c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -123,7 +123,7 @@ public class ReflectionUtils {
    * @param ctorArgs specific constructor arguments
    * @return new instance of the class
    */
-  public static <T> T newInstanceUnchecked(Class<T> klass, Object ...ctorArgs) {
+  public static <T> T newInstanceUnchecked(Class<T> klass, Object... ctorArgs) {
     Class<?>[] ctorArgTypes = Arrays.stream(ctorArgs).map(Object::getClass).toArray(Class<?>[]::new);
     return newInstanceUnchecked(klass, ctorArgTypes, ctorArgs);
   }
@@ -136,7 +136,7 @@ public class ReflectionUtils {
    * @param ctorArgs specific constructor arguments
    * @return new instance of the class
    */
-  public static <T> T newInstanceUnchecked(Class<T> klass, Class<?>[] ctorArgTypes, Object ...ctorArgs) {
+  public static <T> T newInstanceUnchecked(Class<T> klass, Class<?>[] ctorArgTypes, Object... ctorArgs) {
     try {
       return unsafeCast(klass.getConstructor(ctorArgTypes).newInstance(ctorArgs));
     } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
index 6061edd522..4c9902a3c4 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
@@ -331,8 +331,6 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B
 
 object Spark312HoodieParquetFileFormat {
 
-  val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
-
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
     if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
@@ -344,8 +342,10 @@ object Spark312HoodieParquetFileFormat {
   }
 
   private def createParquetFilters(args: Any*): ParquetFilters = {
-    val instance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
-    instance.asInstanceOf[ParquetFilters]
+    // ParquetFilters bears a single ctor (in Spark 3.1)
+    val ctor = classOf[ParquetFilters].getConstructors.head
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetFilters]
   }
 
   private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index 99cb83cf51..351203ca58 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -36,6 +36,7 @@ import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
@@ -43,11 +44,9 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
-import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.SerializableConfiguration
 
 import java.net.URI
 
@@ -396,27 +395,25 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
 
 object Spark32HoodieParquetFileFormat {
 
-  private val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
-  private val PARQUET_VECTORIZED_READER_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader"
-  private val PARQUET_READ_SUPPORT_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport"
-
   private def createParquetFilters(args: Any*): ParquetFilters = {
-    val parquetFiltersInstance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
-    parquetFiltersInstance.asInstanceOf[ParquetFilters]
-  }
-
-  private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = {
-    val vectorizedRecordReader =
-      ReflectionUtils.loadClass(PARQUET_VECTORIZED_READER_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
-    vectorizedRecordReader.asInstanceOf[VectorizedParquetRecordReader]
+    // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it
+    //       up by arg types, and have to instead rely on relative order of ctors
+    val ctor = classOf[ParquetFilters].getConstructors.head
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetFilters]
   }
 
   private def createParquetReadSupport(args: Any*): ParquetReadSupport = {
-    val parquetReadSupport =
-      ReflectionUtils.loadClass(PARQUET_READ_SUPPORT_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
-    parquetReadSupport.asInstanceOf[ParquetReadSupport]
+    // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
+    //       up by arg types, and have to instead rely on relative order of ctors
+    val ctor = classOf[ParquetReadSupport].getConstructors.head
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetReadSupport]
   }
 
+  private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader =
+    ReflectionUtils.newInstanceUnchecked(classOf[VectorizedParquetRecordReader], args.map(_.asInstanceOf[AnyRef]): _*)
+
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
     if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {