You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/04/22 00:36:52 UTC
[hudi] 05/15: Fixed instantiation of the components t/h reflection
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.11-0-apr21-5378-patched
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit bc58383dd1232e8957427b89aaf3f5a83fe17b62
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 20 15:20:40 2022 -0700
Fixed instantiation of the components t/h reflection
---
.../apache/hudi/common/util/ReflectionUtils.java | 4 +--
.../parquet/Spark312HoodieParquetFileFormat.scala | 8 +++---
.../parquet/Spark32HoodieParquetFileFormat.scala | 33 ++++++++++------------
3 files changed, 21 insertions(+), 24 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index ec361d9f9a..13228c440c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -123,7 +123,7 @@ public class ReflectionUtils {
* @param ctorArgs specific constructor arguments
* @return new instance of the class
*/
- public static <T> T newInstanceUnchecked(Class<T> klass, Object ...ctorArgs) {
+ public static <T> T newInstanceUnchecked(Class<T> klass, Object... ctorArgs) {
Class<?>[] ctorArgTypes = Arrays.stream(ctorArgs).map(Object::getClass).toArray(Class<?>[]::new);
return newInstanceUnchecked(klass, ctorArgTypes, ctorArgs);
}
@@ -136,7 +136,7 @@ public class ReflectionUtils {
* @param ctorArgs specific constructor arguments
* @return new instance of the class
*/
- public static <T> T newInstanceUnchecked(Class<T> klass, Class<?>[] ctorArgTypes, Object ...ctorArgs) {
+ public static <T> T newInstanceUnchecked(Class<T> klass, Class<?>[] ctorArgTypes, Object... ctorArgs) {
try {
return unsafeCast(klass.getConstructor(ctorArgTypes).newInstance(ctorArgs));
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
index 6061edd522..4c9902a3c4 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
@@ -331,8 +331,6 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B
object Spark312HoodieParquetFileFormat {
- val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
-
def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
@@ -344,8 +342,10 @@ object Spark312HoodieParquetFileFormat {
}
private def createParquetFilters(args: Any*): ParquetFilters = {
- val instance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
- instance.asInstanceOf[ParquetFilters]
+ // ParquetFilters bears a single ctor (in Spark 3.1)
+ val ctor = classOf[ParquetFilters].getConstructors.head
+ ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+ .asInstanceOf[ParquetFilters]
}
private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index 99cb83cf51..351203ca58 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -36,6 +36,7 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
@@ -43,11 +44,9 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
-import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.SerializableConfiguration
import java.net.URI
@@ -396,27 +395,25 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
object Spark32HoodieParquetFileFormat {
- private val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
- private val PARQUET_VECTORIZED_READER_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader"
- private val PARQUET_READ_SUPPORT_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport"
-
private def createParquetFilters(args: Any*): ParquetFilters = {
- val parquetFiltersInstance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
- parquetFiltersInstance.asInstanceOf[ParquetFilters]
- }
-
- private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = {
- val vectorizedRecordReader =
- ReflectionUtils.loadClass(PARQUET_VECTORIZED_READER_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
- vectorizedRecordReader.asInstanceOf[VectorizedParquetRecordReader]
+ // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it
+ // up by arg types, and have to instead rely on relative order of ctors
+ val ctor = classOf[ParquetFilters].getConstructors.head
+ ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+ .asInstanceOf[ParquetFilters]
}
private def createParquetReadSupport(args: Any*): ParquetReadSupport = {
- val parquetReadSupport =
- ReflectionUtils.loadClass(PARQUET_READ_SUPPORT_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
- parquetReadSupport.asInstanceOf[ParquetReadSupport]
+ // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
+ // up by arg types, and have to instead rely on relative order of ctors
+ val ctor = classOf[ParquetReadSupport].getConstructors.head
+ ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+ .asInstanceOf[ParquetReadSupport]
}
+ private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader =
+ ReflectionUtils.newInstanceUnchecked(classOf[VectorizedParquetRecordReader], args.map(_.asInstanceOf[AnyRef]): _*)
+
def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {