You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/29 02:32:14 UTC

[GitHub] [hudi] vinothchandar commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

vinothchandar commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r909140564


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java:
##########
@@ -84,8 +87,19 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(String instantTime,
         dedupedKeys = keys.repartition(parallelism);
       }
 
-      HoodieData<HoodieRecord<T>> dedupedRecords =
-          dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
+      HoodieData dedupedRecords;
+      if (config.getRecordType() == HoodieRecordType.AVRO) {
+        dedupedRecords =
+            dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
+      } else if (config.getRecordType() == HoodieRecordType.SPARK) {
+        dedupedRecords = dedupedKeys.map(key -> {
+          Class<?> recordClazz = ReflectionUtils.getClass("org.apache.hudi.commmon.model.HoodieSparkRecord");
+          Method method = recordClazz.getMethod("empty", HoodieKey.class);
+          return method.invoke(null, key);

Review Comment:
   +1 actually anything done per record like this is going to affect perf severly



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -194,6 +194,8 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
         .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
         .withOperationField(config.allowOperationMetadataField())
         .withPartition(operation.getPartitionPath())
+        .withRecordType(config.getRecordType())
+        .withCombiningEngineClassFQN(config.getMergeClass())

Review Comment:
   rename builder name?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -766,4 +764,59 @@ object HoodieSparkSqlWriter {
       Map.empty
     }
   }
+
+  private def createHoodieRecordRdd(df: DataFrame, config: HoodieConfig, parameters: Map[String, String], schema: Schema): JavaRDD[HoodieRecord[_]] = {
+    val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val tblName = config.getString(HoodieWriteConfig.TBL_NAME)
+    val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+    val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+      WriteOperationType.fromValue(config.getString(OPERATION)).equals(WriteOperationType.UPSERT) ||
+      parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
+    val precombineField = config.getString(PRECOMBINE_FIELD)
+    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps))
+    val partitionCols = HoodieSparkUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
+    val dropPartitionColumns = config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+   HoodieRecord.HoodieRecordType.valueOf(config.getStringOrDefault(HoodieWriteConfig.RECORD_TYPE)) match {

Review Comment:
   ok understood how we are choosing both the paths now



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -140,29 +136,31 @@ public abstract class AbstractHoodieLogRecordReader {
   private Option<String> partitionName;
   // Populate meta fields for the records
   private boolean populateMetaFields = true;
+  // Record type read from log block
+  protected final HoodieRecordType recordType;
 
   protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
                                           Schema readerSchema,
                                           String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
                                           int bufferSize, Option<InstantRange> instantRange,
-                                          boolean withOperationField) {
+                                          boolean withOperationField, HoodieRecordType recordType, String combiningEngineClassFQN) {
     this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
-        instantRange, withOperationField, true, Option.empty(), InternalSchema.getEmptyInternalSchema());
+        instantRange, withOperationField, true, Option.empty(), InternalSchema.getEmptyInternalSchema(), recordType, combiningEngineClassFQN);
   }
 
   protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
                                           Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
                                           boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
                                           boolean withOperationField, boolean forceFullScan,
-                                          Option<String> partitionName, InternalSchema internalSchema) {
+                                          Option<String> partitionName, InternalSchema internalSchema, HoodieRecordType recordType, String combiningEngineClassFQN) {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
     this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
     // load class from the payload fully qualified class name
     HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
     this.preCombineField = tableConfig.getPreCombineField();
-    this.mergeClassFQN = tableConfig.getMergeClass();
+    this.mergeClassFQN = combiningEngineClassFQN;

Review Comment:
   rename variables across the board?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java:
##########
@@ -89,6 +91,8 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept
         .withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
         .withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
             HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+        .withRecordType(HoodieRecordType.AVRO)
+        .withCombiningEngineClassFQN(HoodieAvroRecordMerge.class.getName())

Review Comment:
   so we override this from the reader, even though the table property has say `hoodie.merge.class` as say HoodieSparkRecordMerge?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org