You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/19 12:36:51 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #6090: [HUDI-4071][Stacked on 5771] Support upsert without record key

nsivabalan commented on code in PR #6090:
URL: https://github.com/apache/hudi/pull/6090#discussion_r924448200


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -236,89 +247,157 @@ object HoodieSparkSqlWriter {
             (writeStatuses, client)
           }
           case _ => { // any other operation
-            // register classes & schemas
-            val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-            sparkContext.getConf.registerKryoClasses(
-              Array(classOf[org.apache.avro.generic.GenericData],
-                classOf[org.apache.avro.Schema]))
-            var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-            val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema)
-            var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
-            if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
-              && internalSchemaOpt.isEmpty) {
-              // force apply full schema evolution.
-              internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema))
-            }
-            if (reconcileSchema) {
-              schema = lastestSchema
-            }
-            if (internalSchemaOpt.isDefined) {
-              // Apply schema evolution.
-              val mergedSparkSchema = if (!reconcileSchema) {
-                AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema))
-              } else {
-                // Auto merge write schema and read schema.
-                val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get)
-                AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema, lastestSchema.getName))
+            if (WriteOperationType.UPSERT.equals(operation) &&
+              hoodieConfig.getBooleanOrDefault(HoodieTableConfig.UPSERT_WITHOUT_RECORD_KEY)) {
+              // register classes & schemas
+              val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+              sparkContext.getConf.registerKryoClasses(
+                Array(classOf[org.apache.avro.generic.GenericData],
+                  classOf[org.apache.avro.Schema]))
+              var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
+              if (reconcileSchema) {
+                schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
               }
-              schema = AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema, structName, nameSpace)
-            }
-
-            if (reconcileSchema && internalSchemaOpt.isEmpty) {
-              schema = lastestSchema
-            }
-            validateSchemaForHoodieIsDeleted(schema)
-            sparkContext.getConf.registerAvroSchemas(schema)
-            log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-            // Convert to RDD[HoodieRecord]
-            val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
-              org.apache.hudi.common.util.Option.of(schema))
-            val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
-              operation.equals(WriteOperationType.UPSERT) ||
-              parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
-                HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
-            val hoodieAllIncomingRecords = genericRecords.map(gr => {
-              val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
-              val hoodieRecord = if (shouldCombine) {
-                val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-                  DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                  DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
-                  .asInstanceOf[Comparable[_]]
-                DataSourceUtils.createHoodieRecord(processedRecord,
-                  orderingVal,
-                  keyGenerator.getKey(gr),
-                  hoodieConfig.getString(PAYLOAD_CLASS_NAME))
-              } else {
-                DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+              sparkContext.getConf.registerAvroSchemas(schema)
+              log.info(s"Registered avro schema : ${schema.toString(true)}")
+
+              // Convert to RDD[HoodieRecord]
+              val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
+                org.apache.hudi.common.util.Option.of(schema))
+              val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+                operation.equals(WriteOperationType.UPSERT) ||
+                parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+                  HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
+              val writeSchema =
+                if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema)
+                else schema
+
+              // init write client
+              val client = hoodieWriteClient.getOrElse(
+                DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path, tblName, mapAsJavaMap(parameters))
+              ).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+              // needed to generate commit sequence number
+              val partitionId = client.getEngineContext.getTaskContextSupplier.getPartitionIdSupplier.get
+              val recordIndex = new AtomicLong(1)
+
+              val hoodieAllIncomingRecords = genericRecords.map(gr => {
+                val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
+                val csn = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement())

Review Comment:
   I thought plan was to re-use the commit seq no that we generate within the writer (i.e. by executors). or is my understanding wrong. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org