You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/15 13:32:25 UTC

[GitHub] [hudi] nsivabalan opened a new pull request #1834: [WIP][HUDI-1013] Adding Bulk Insert V2 implementation

nsivabalan opened a new pull request #1834:
URL: https://github.com/apache/hudi/pull/1834


   ## What is the purpose of the pull request
   
   Adding support for "bulk_insert_dataset" which has better performance compared to existing "bulk_insert". 
   
   ## Brief change log
   
   - Added support for "bulk_insert_dataset" which has better performance compared to existing "bulk_insert". 
   - This path introduces a new datasource called "org.apache.hudi.internal" and all supporting cast like DefaultSource, DataSourceWriter, DataWriterFactory, DataWriter, etc for the same.
   - This patch also introduces HoodieRowCreateHandle, HoodieInternalRowFileWriterFactory, HoodieInternalRowFileWriter, etc to assist in writing InternalRows to parquet. 
   - This patch adds changes to KeyGenerator to ensure getRecordKey and getPartitionPath is supported with Row for "bulk_insert_dataset". New apis are added to KeyGenerator, but default implementation is added so as to not have any breaking change. All keygenerator implementations have been fixed on this regards. 
   - Added HoodieDatasetBulkInsertHelper to assist in prepping the dataset before calling into datasource write. 
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added tests in HoodieSparkSqlWriterSuite to test happy path*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468512749



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
##########
@@ -54,12 +51,17 @@ public String getPartitionPath(GenericRecord record) {
   }
 
   @Override
-  public List<String> getRecordKeyFields() {
-    return recordKeyFields;
+  public List<String> getPartitionPathFields() {
+    return new ArrayList<>();
   }
 
   @Override
-  public List<String> getPartitionPathFields() {
-    return new ArrayList<>();
+  public String getRecordKeyFromRow(Row row) {
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRecordKeyPositions(), true);
+  }
+
+  @Override
+  public String getPartitionPathFromRow(Row row) {

Review comment:
       yes, makes sense. we will revisit after 0.6.0 release.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469922118



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -85,71 +84,40 @@ public final HoodieKey getKey(GenericRecord record) {
     }).collect(Collectors.toList());
   }
 
-  @Override
-  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
-    // parse simple feilds
-    getRecordKeyFields().stream()
-        .filter(f -> !(f.contains(".")))
-        .forEach(f -> {
-          if (structType.getFieldIndex(f).isDefined()) {
-            recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
-          } else {
-            throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
-          }
-        });
-    // parse nested fields
-    getRecordKeyFields().stream()
-        .filter(f -> f.contains("."))
-        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
-    // parse simple fields
-    if (getPartitionPathFields() != null) {
-      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+  void buildFieldPositionMapIfNeeded(StructType structType) {
+    if (this.structType == null) {
+      // parse simple fields
+      getRecordKeyFields().stream()
+          .filter(f -> !(f.contains(".")))
           .forEach(f -> {
             if (structType.getFieldIndex(f).isDefined()) {
-              partitionPathPositions.put(f,
-                  Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
             } else {
-              partitionPathPositions.put(f, Collections.singletonList(-1));
+              throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
             }
           });
       // parse nested fields
-      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
-          .forEach(f -> partitionPathPositions.put(f,
-              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
-    }
-    this.structName = structName;
-    this.structType = structType;
-    this.recordNamespace = recordNamespace;
-  }
-
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
-  @Override
-  public String getRecordKey(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionHelper.createConverterToAvro(structType, structName, recordNamespace);
+      getRecordKeyFields().stream()
+          .filter(f -> f.contains("."))
+          .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+      // parse simple fields
+      if (getPartitionPathFields() != null) {
+        getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+            .forEach(f -> {
+              if (structType.getFieldIndex(f).isDefined()) {
+                partitionPathPositions.put(f,
+                    Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              } else {
+                partitionPathPositions.put(f, Collections.singletonList(-1));
+              }
+            });
+        // parse nested fields
+        getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+            .forEach(f -> partitionPathPositions.put(f,
+                RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+      }
+      this.structType = structType;

Review comment:
       may I know where is the structType being used ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468565839



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  private List<String> recordKeyFields;

Review comment:
       you mean having all the variables here? why did we need that change?  Not sure if simple and complex should share though




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#issuecomment-673310825


   @nsivabalan this is ready. I am going ahead and merging. I also re-ran the benchmark again . Seems to clock the same 30 mins against spark.write.parquet. 
   
   Please carefully go over the changes I have made in the last commits here.. and see if anything needs follow on fixing. Our timelines are tight. we need to do it tomorrow, if at all 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468867821



##########
File path: hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
##########
@@ -34,13 +36,28 @@ import org.scalatest.Assertions.fail
 class TestDataSourceDefaults {
 
   val schema = SchemaTestUtil.getComplexEvolvedSchema
+  val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)

Review comment:
       https://issues.apache.org/jira/browse/HUDI-1179




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468311412



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -108,262 +106,280 @@ private[hudi] object HoodieSparkSqlWriter {
           throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
         }
       }
-      val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-          // register classes & schemas
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
-          val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-          sparkContext.getConf.registerAvroSchemas(schema)
-          log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-          // Convert to RDD[HoodieRecord]
-          val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
-          }).toJavaRDD()
-
-          // Handle various save modes
-          if (mode == SaveMode.ErrorIfExists && exists) {
-            throw new HoodieException(s"hoodie table at $basePath already exists.")
-          }
 
-          if (mode == SaveMode.Overwrite && exists) {
-            log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
-            fs.delete(basePath, true)
-            exists = false
-          }
+      val (writeSuccessfulRetVal: Boolean, commitTimeRetVal: common.util.Option[String], compactionInstantRetVal: common.util.Option[String],
+      writeClientRetVal: HoodieWriteClient[HoodieRecordPayload[Nothing]], tableConfigRetVal: HoodieTableConfig) =
+         if (operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        // register classes & schemas
+        val structName = s"${tblName}_record"
+        val nameSpace = s"hoodie.${tblName}"
 
-          // Create the table if not present
-          if (!exists) {
-            //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
-            val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
-              path.get, HoodieTableType.valueOf(tableType),
-              tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
-            tableConfig = tableMetaClient.getTableConfig
-          }
+        // Handle various save modes
+        if (mode == SaveMode.ErrorIfExists && exists) {
+          throw new HoodieException(s"hoodie table at $basePath already exists.")
+        }
 
-          // Create a HoodieWriteClient & issue the write.
-          val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
-            tblName, mapAsJavaMap(parameters)
-          )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+        val (success, commitTime: common.util.Option[String]) =
+          if (mode == SaveMode.Ignore && exists) {
+            log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+            (false, common.util.Option.ofNullable(instantTime))
+          } else {
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
+              val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val hoodieRecords =
-            if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-              DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
+            val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName,
+              mapAsJavaMap(parameters))
+
+            val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace)
+            hoodieDF.write.format("org.apache.hudi.internal").option(INSTANT_TIME, instantTime)
+              .options(parameters).save()
+            val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+            val syncHiveSucess = if (hiveSyncEnabled) {
+              log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+              val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+              syncHive(basePath, fs, parameters)
             } else {
-              hoodieAllIncomingRecords
+              true
             }
-
-          if (hoodieRecords.isEmpty()) {
-            log.info("new batch has no new records, skipping...")
-            (true, common.util.Option.empty())
+            (syncHiveSucess, common.util.Option.ofNullable(instantTime))
           }
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
-          (writeStatuses, client)
-        } else {
+        (success, commitTime, common.util.Option.of(""), hoodieWriteClient.orNull, tableConfig)
+       } else {
+        val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
+          if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+            // register classes & schemas
+            val structName = s"${tblName}_record"
+            val nameSpace = s"hoodie.${tblName}"
+            sparkContext.getConf.registerKryoClasses(
+              Array(classOf[org.apache.avro.generic.GenericData],
+                classOf[org.apache.avro.Schema]))
+            val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
+            sparkContext.getConf.registerAvroSchemas(schema)
+            log.info(s"Registered avro schema : ${schema.toString(true)}")
+
+            // Convert to RDD[HoodieRecord]
+            val keyGenerator = DataSourceUtils.createKeyGenerator(HoodieWriterUtils.toProperties(parameters))
+            val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+            val hoodieAllIncomingRecords = genericRecords.map(gr => {
+              val orderingVal = DataSourceUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
+                .asInstanceOf[Comparable[_]]
+              DataSourceUtils.createHoodieRecord(gr,
+                orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
+            }).toJavaRDD()
+
+            // Handle various save modes
+            if (mode == SaveMode.ErrorIfExists && exists) {
+              throw new HoodieException(s"hoodie table at $basePath already exists.")
+            }
+            if (mode == SaveMode.Ignore && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+              (false, common.util.Option.empty())
+            }
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          // Handle save modes
-          if (mode != SaveMode.Append) {
-            throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
+              val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
+            // Create a HoodieWriteClient & issue the write.
+            val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName,
+              mapAsJavaMap(parameters)
+            )
+
+            val hoodieRecords =
+              if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
+                DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
+              } else {
+                hoodieAllIncomingRecords
+              }
+
+            if (hoodieRecords.isEmpty()) {
+              log.info("new batch has no new records, skipping...")
+              (true, common.util.Option.empty())
+            }
+            client.startCommitWithTime(instantTime)
+            val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
+            (writeStatuses, client)
+          } else {
+
+            // Handle save modes
+            if (mode != SaveMode.Append) {
+              throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
+            }
 
-          // Convert to RDD[HoodieKey]
-          val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
+            val structName = s"${tblName}_record"
+            val nameSpace = s"hoodie.${tblName}"
+            sparkContext.getConf.registerKryoClasses(
+              Array(classOf[org.apache.avro.generic.GenericData],
+                classOf[org.apache.avro.Schema]))
 
-          if (!exists) {
-            throw new HoodieException(s"hoodie table at $basePath does not exist")
-          }
+            // Convert to RDD[HoodieKey]
+            val keyGenerator = DataSourceUtils.createKeyGenerator(HoodieWriterUtils.toProperties(parameters))
+            val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+            val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
+
+            if (!exists) {
+              throw new HoodieException(s"hoodie table at $basePath does not exist")
+            }
 
-          // Create a HoodieWriteClient & issue the delete.
-          val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-            Schema.create(Schema.Type.NULL).toString, path.get, tblName,
-            mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+            // Create a HoodieWriteClient & issue the delete.
+            val client = DataSourceUtils.createHoodieClient(jsc,
+              Schema.create(Schema.Type.NULL).toString, path.get, tblName,
+              mapAsJavaMap(parameters)
+            )
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
+            // Issue deletes
+            client.startCommitWithTime(instantTime)
+            val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
+            (writeStatuses, client)
           }
 
-          // Issue deletes
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
-          (writeStatuses, client)
+        // Check for errors and commit the write.
+        val (writeSuccessful, compactionInstant) =
+          commitAndPerformPostOperations(writeStatuses, parameters, writeClient, tableConfig, instantTime, basePath,
+            operation, jsc)
+        (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
         }
-
-      // Check for errors and commit the write.
-      val (writeSuccessful, compactionInstant) =
-        commitAndPerformPostOperations(writeStatuses, parameters, writeClient, tableConfig, instantTime, basePath,
-          operation, jsc)
-      (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
+      (writeSuccessfulRetVal, commitTimeRetVal, compactionInstantRetVal, writeClientRetVal, tableConfigRetVal)
     }
   }
 
-  /**
-    * Add default options for unspecified write options keys.
-    *
-    * @param parameters
-    * @return
-    */
-  def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
-    Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
-      TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
-      PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
-      PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
-      RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
-      PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
-      KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
-      COMMIT_METADATA_KEYPREFIX_OPT_KEY -> DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
-      INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
-      STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
-      STREAMING_RETRY_INTERVAL_MS_OPT_KEY -> DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL,
-      STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL,
-      HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL,
-      HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL,
-      HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL,
-      HIVE_BASE_FILE_FORMAT_OPT_KEY -> DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL,
-      HIVE_USER_OPT_KEY -> DEFAULT_HIVE_USER_OPT_VAL,
-      HIVE_PASS_OPT_KEY -> DEFAULT_HIVE_PASS_OPT_VAL,
-      HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
-      HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
-      HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
-      HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL,
-      HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL,
-      ASYNC_COMPACT_ENABLE_KEY -> DEFAULT_ASYNC_COMPACT_ENABLE_VAL
-    ) ++ translateStorageTypeToTableType(parameters)
-  }
+    private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean
 
-  def toProperties(params: Map[String, String]): TypedProperties = {
-    val props = new TypedProperties()
-    params.foreach(kv => props.setProperty(kv._1, kv._2))
-    props
-  }
+    =
+    {
+      val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters)
+      val hiveConf: HiveConf = new HiveConf()
+      hiveConf.addResource(fs.getConf)
+      new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
+      true
+    }
 
-  private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = {
-    val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters)
-    val hiveConf: HiveConf = new HiveConf()
-    hiveConf.addResource(fs.getConf)
-    new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
-    true
-  }
+    private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig
+
+    =
+    {
+      val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
+      hiveSyncConfig.basePath = basePath.toString
+      hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY);
+      hiveSyncConfig.usePreApacheInputFormat =
+        parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => r.toBoolean)
+      hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY)
+      hiveSyncConfig.tableName = parameters(HIVE_TABLE_OPT_KEY)
+      hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY)
+      hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY)
+      hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY)
+      hiveSyncConfig.partitionFields =
+        ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
+      hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
+      hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
+      hiveSyncConfig
+    }
 
-  private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = {
-    val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
-    hiveSyncConfig.basePath = basePath.toString
-    hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY);
-    hiveSyncConfig.usePreApacheInputFormat =
-      parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => r.toBoolean)
-    hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY)
-    hiveSyncConfig.tableName = parameters(HIVE_TABLE_OPT_KEY)
-    hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY)
-    hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY)
-    hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY)
-    hiveSyncConfig.partitionFields =
-      ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
-    hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
-    hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
-    hiveSyncConfig
-  }
+    private def commitAndPerformPostOperations(writeStatuses: JavaRDD[WriteStatus],
+                                               parameters: Map[String, String],
+                                               client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
+                                               tableConfig: HoodieTableConfig,
+                                               instantTime: String,
+                                               basePath: Path,
+                                               operation: String,
+                                               jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String])
+
+    =
+    {
+      val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
+      if (errorCount == 0) {
+        log.info("No errors. Proceeding to commit the write.")
+        val metaMap = parameters.filter(kv =>
+          kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
+        val commitSuccess = if (metaMap.isEmpty) {
+          client.commit(instantTime, writeStatuses)
+        } else {
+          val extraMetadata: util.Map[String, String] = new util.HashMap[String, String](mapAsJavaMap(metaMap));
+          client.commit(instantTime, writeStatuses, common.util.Option.of(extraMetadata))
+        }
 
-  private def commitAndPerformPostOperations(writeStatuses: JavaRDD[WriteStatus],
-                                             parameters: Map[String, String],
-                                             client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
-                                             tableConfig: HoodieTableConfig,
-                                             instantTime: String,
-                                             basePath: Path,
-                                             operation: String,
-                                             jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String]) = {
-    val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
-    if (errorCount == 0) {
-      log.info("No errors. Proceeding to commit the write.")
-      val metaMap = parameters.filter(kv =>
-        kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
-      val commitSuccess = if (metaMap.isEmpty) {
-        client.commit(instantTime, writeStatuses)
-      } else {
-        client.commit(instantTime, writeStatuses,
-          common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
-      }
+        if (commitSuccess) {
+          log.info("Commit " + instantTime + " successful!")
+        }
+        else {
+          log.info("Commit " + instantTime + " failed!")
+        }
 
-      if (commitSuccess) {
-        log.info("Commit " + instantTime + " successful!")
-      }
-      else {
-        log.info("Commit " + instantTime + " failed!")
-      }
+        val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
+        val compactionInstant: common.util.Option[java.lang.String] =
+          if (asyncCompactionEnabled) {
+            client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
+          } else {
+            common.util.Option.empty()
+          }
 
-      val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
-      val compactionInstant : common.util.Option[java.lang.String] =
-      if (asyncCompactionEnabled) {
-        client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
-      } else {
-        common.util.Option.empty()
-      }
+        log.info(s"Compaction Scheduled is $compactionInstant")
+        val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+        val syncHiveSucess = if (hiveSyncEnabled) {
+          log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+          val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+          syncHive(basePath, fs, parameters)
+        } else {
+          true
+        }
 
-      log.info(s"Compaction Scheduled is $compactionInstant")
-      val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
-      val syncHiveSucess = if (hiveSyncEnabled) {
-        log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
-        val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
-        syncHive(basePath, fs, parameters)
+        log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
+        if (!asyncCompactionEnabled) {
+          client.close()
+        }
+        (commitSuccess && syncHiveSucess, compactionInstant)
       } else {
-        true
-      }
-
-      log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
-      if (!asyncCompactionEnabled) {
-        client.close()
-      }
-      (commitSuccess && syncHiveSucess, compactionInstant)
-    } else {
-      log.error(s"$operation failed with $errorCount errors :")
-      if (log.isTraceEnabled) {
-        log.trace("Printing out the top 100 errors")
-        writeStatuses.rdd.filter(ws => ws.hasErrors)
-          .take(100)
-          .foreach(ws => {
-            log.trace("Global error :", ws.getGlobalError)
-            if (ws.getErrors.size() > 0) {
-              ws.getErrors.foreach(kt =>
-                log.trace(s"Error for key: ${kt._1}", kt._2))
-            }
-          })
+        log.error(s"$operation failed with $errorCount errors :")
+        if (log.isTraceEnabled) {
+          log.trace("Printing out the top 100 errors")
+          writeStatuses.rdd.filter(ws => ws.hasErrors)
+            .take(100)
+            .foreach(ws => {
+              log.trace("Global error :", ws.getGlobalError)
+              if (ws.getErrors.size() > 0) {
+                ws.getErrors.foreach(kt =>
+                  log.trace(s"Error for key: ${kt._1}", kt._2))
+              }
+            })
+        }
+        (false, common.util.Option.empty())
       }
-      (false, common.util.Option.empty())
     }
-  }
 
-  private def isAsyncCompactionEnabled(client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
-                                       tableConfig: HoodieTableConfig,
-                                       parameters: Map[String, String], configuration: Configuration) : Boolean = {
-    log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}")
-    if (!client.getConfig.isInlineCompaction
-      && parameters.get(ASYNC_COMPACT_ENABLE_KEY).exists(r => r.toBoolean)) {
-      tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
-    } else {
-      false
+    private def isAsyncCompactionEnabled(client: HoodieWriteClient[HoodieRecordPayload[Nothing]],

Review comment:
       these seem like formatting only change. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469299509



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -177,4 +191,26 @@ private long convertLongTimeToMillis(Long partitionVal) {
     }
     return MILLISECONDS.convert(partitionVal, timeUnit);
   }
+
+  @Override
+  public String getRecordKey(Row row) {
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRecordKeyPositions(), false);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    Object fieldVal = null;
+    Object partitionPathFieldVal =  RowKeyGeneratorHelper.getNestedFieldVal(row, getPartitionPathPositions().get(getPartitionPathFields().get(0)));
+    try {
+      if (partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
+          || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+        fieldVal = 1L;
+      } else {
+        fieldVal = partitionPathFieldVal;
+      }
+      return getPartitionPath(fieldVal);
+    } catch (ParseException e) {

Review comment:
       Can we switch this to Exception to be in sync up with GenericRecord behavior.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,

Review comment:
       same here. 
   ```
        .forEach(f -> {
                 Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));	            if (structType.getFieldIndex(f).isDefined()) {
                 partitionPathPositions.put(f,
                     Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
               } else {
                 partitionPathPositions.put(f, Collections.singletonList(-1));
               }
             });
   ```

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {
+      converterFn = AvroConversionHelper.createConverterToAvro(structType, structName, recordNamespace);
+    }
+    GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
+    return getKey(genericRecord).getRecordKey();
+  }
+
+  /**
+   * Fetch partition path from {@link Row}.
+   * @param row instance of {@link Row} from which partition path is requested
+   * @return the partition path of interest from {@link Row}.
+   */
+  @Override
+  public String getPartitionPath(Row row) {
+    if (null != converterFn) {

Review comment:
       would be good to add a precondition check here that init has been called. 

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       would be good to add a precondition check here that init has been called. 

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));

Review comment:
       there could be some bug here. if field is not found in structType. I fixed it in RowKeyGeneratorHelper for nested fields, but missed it here. 
   
   something like 
   ```
    .forEach(f ->
           {
             if (structType.getFieldIndex(f).isDefined()) {
               recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
             } else {
               throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
             }
           });
   ```

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {
+      converterFn = AvroConversionHelper.createConverterToAvro(structType, structName, recordNamespace);
+    }
+    GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
+    return getKey(genericRecord).getRecordKey();
+  }
+
+  /**
+   * Fetch partition path from {@link Row}.
+   * @param row instance of {@link Row} from which partition path is requested
+   * @return the partition path of interest from {@link Row}.
+   */
+  @Override
+  public String getPartitionPath(Row row) {
+    if (null != converterFn) {

Review comment:
       if you plan to add such precondition, ensure all built in key generators call it since they might override getRecordKey(Row) and getPartitionPath(Row).
   ```
     protected void preConditionCheckForRowInit(){
       if(!isRowInitCalled()){
         throw new IllegalStateException("KeyGenerator#initializeRowKeyGenerator should have been invoked before this method ");
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468879180



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
+ * does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link InternalRow}.
+ */
+public class HoodieInternalRow extends InternalRow {
+
+  private String commitTime;
+  private String commitSeqNumber;
+  private String recordKey;
+  private String partitionPath;
+  private String fileName;
+  private InternalRow row;
+
+  public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
+      String fileName, InternalRow row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+  }
+
+  @Override
+  public int numFields() {
+    return row.numFields();
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = null;
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = null;
+          break;
+        }
+        case 2: {
+          this.recordKey = null;
+          break;
+        }
+        case 3: {
+          this.partitionPath = null;
+          break;
+        }
+        case 4: {
+          this.fileName = null;
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.setNullAt(i);

Review comment:
       even I had the same doubt when I start reviewing this at first. thats why added some java docs for this class. row will have meta columns as well. just that meta columns will not be fetched from the row but from memory. @bvaradar did some analysis before arriving at this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar merged pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar merged pull request #1834:
URL: https://github.com/apache/hudi/pull/1834


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r466385801



##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Base class for all the built-in key generators. Contains methods structured for
- * code reuse amongst them.
- */
-public abstract class BuiltinKeyGenerator extends KeyGenerator {

Review comment:
       Note to reviewer: moved this file to hudi-spark, as this needs access to AvroConversionUtils




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468932578



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();

Review comment:
       StructType is just the schema and for recordKey fields and partition paths, we parse the structType and store the chain of positions (if nested). don't think we get away without storing positions. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r458736979



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -670,7 +670,9 @@ public Builder withPath(String basePath) {
     }
 
     public Builder withSchema(String schemaStr) {
-      props.setProperty(AVRO_SCHEMA, schemaStr);
+      if (null != schemaStr) {

Review comment:
       we should probably assert that this is not null?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Hoodie's internal write status used in datasource implementation of bulk insert.
+ */
+public class HoodieInternalWriteStatus implements Serializable {

Review comment:
       can we keep this in `hudi-spark`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468332027



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -108,262 +106,280 @@ private[hudi] object HoodieSparkSqlWriter {
           throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
         }
       }
-      val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-          // register classes & schemas
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
-          val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-          sparkContext.getConf.registerAvroSchemas(schema)
-          log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-          // Convert to RDD[HoodieRecord]
-          val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
-          }).toJavaRDD()
-
-          // Handle various save modes
-          if (mode == SaveMode.ErrorIfExists && exists) {
-            throw new HoodieException(s"hoodie table at $basePath already exists.")
-          }
 
-          if (mode == SaveMode.Overwrite && exists) {
-            log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
-            fs.delete(basePath, true)
-            exists = false
-          }
+      val (writeSuccessfulRetVal: Boolean, commitTimeRetVal: common.util.Option[String], compactionInstantRetVal: common.util.Option[String],

Review comment:
       this whole block is not indented at the right level. I am going to try and apply changes from this file line-by-line onto latest file on master




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468884350



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -177,4 +191,26 @@ private long convertLongTimeToMillis(Long partitionVal) {
     }
     return MILLISECONDS.convert(partitionVal, timeUnit);
   }
+
+  @Override
+  public String getRecordKey(Row row) {
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRecordKeyPositions(), false);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    Object fieldVal = null;
+    Object partitionPathFieldVal =  RowKeyGeneratorHelper.getNestedFieldVal(row, getPartitionPathPositions().get(getPartitionPathFields().get(0)));

Review comment:
       yes, this extends from SimpleKeyGenerator. Also,  we have a special case for partition path if incase we don't find the field. couldn't find a better way to do it. position will return -1 and when parsing for actual Row, we will return DEFAULT_PARTITION_PATH. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469401193



##########
File path: hudi-client/src/test/java/org/apache/hudi/testutils/HoodieDatasetTestUtils.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
+
+/**
+ * Dataset test utils.
+ */
+public class HoodieDatasetTestUtils {
+
+  public static final StructType STRUCT_TYPE = new StructType(new StructField[] {

Review comment:
       we need to clean all this up and make things more generic. its okay for now,




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468745957



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       as far as I can tell, this is private and set to null by default and not assigned anywhere else. so we will never pass `if (null != ..)` check. I think this should be if (null ==converterFn) if the intention was lazy initialization.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {

Review comment:
       note to self: need to understand this better and see if we can simplify

##########
File path: hudi-client/src/test/java/org/apache/hudi/testutils/HoodieDatasetTestUtils.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
+
+/**
+ * Dataset test utils.
+ */
+public class HoodieDatasetTestUtils {

Review comment:
       this is a misleading name. Need to rename this. its unclear if it refers to a hoodie dataset or a spark dataset framework.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
##########
@@ -35,6 +35,7 @@
   // bulk insert
   BULK_INSERT("bulk_insert"),
   BULK_INSERT_PREPPED("bulk_insert_prepped"),
+  BULK_INSERT_DATASET("bulk_insert_dataset"),

Review comment:
       Need to understand why this is needed. so, we pick a different mode for the writer path I believe. We should use a config and not overload further if possible.

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
##########
@@ -51,4 +53,32 @@ protected KeyGenerator(TypedProperties config) {
     throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
         + "Please override this method in your custom key generator.");
   }
+
+  /**
+   * Initializes {@link KeyGenerator} for {@link Row} based operations.
+   * @param structType structype of the dataset.
+   * @param structName struct name of the dataset.
+   * @param recordNamespace record namespace of the dataset.
+   */
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {

Review comment:
       can this be just passed to the `getRecordKey()` methods or overload a constructor? sticking a random `init()` method here is not very desirable. 
   
   Overall, this ties the KeyGenerator tightly with Spark. for e.g when we do flink, writing a key generator would require a Spark dependency for a flink job. This need more thought. 
   
   cc @bvaradar @leesf @nsivabalan 

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieRowParquetWriteSupport.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing Row to Parquet.
+ */
+public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
+
+  private Configuration hadoopConf;
+  private BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter) {
+    super();
+    Configuration hadoopConf = new Configuration(conf);
+    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "false");

Review comment:
       should we be hardcoding these?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
+ * does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link InternalRow}.
+ */
+public class HoodieInternalRow extends InternalRow {
+
+  private String commitTime;
+  private String commitSeqNumber;
+  private String recordKey;
+  private String partitionPath;
+  private String fileName;
+  private InternalRow row;
+
+  public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
+      String fileName, InternalRow row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+  }
+
+  @Override
+  public int numFields() {
+    return row.numFields();
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = null;
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = null;
+          break;
+        }
+        case 2: {
+          this.recordKey = null;
+          break;
+        }
+        case 3: {
+          this.partitionPath = null;
+          break;
+        }
+        case 4: {
+          this.fileName = null;
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.setNullAt(i);

Review comment:
       note to self : check if this is indeed correct. i was expecting us to do something like `row.setNullAt(i-5)` 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -95,20 +95,20 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
       Option<Map<String, String>> extraMetadata) {
-    HoodieTableMetaClient metaClient = createMetaClient(false);
-    return commit(instantTime, writeStatuses, extraMetadata, metaClient.getCommitActionType());
+    List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStat(instantTime, stats, extraMetadata);
   }
 
-  private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata, String actionType) {
-
+  // fixme(bulkinsertv2) this name is ughh
+  public boolean commitStat(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {

Review comment:
       got this when compiling
   
   ```
   Error:(433, 16) overloaded method value commit with alternatives:
     (x$1: String,x$2: java.util.List[org.apache.hudi.common.model.HoodieWriteStat],x$3: org.apache.hudi.common.util.Option[java.util.Map[String,String]])Boolean <and>
     (x$1: String,x$2: org.apache.spark.api.java.JavaRDD[org.apache.hudi.client.WriteStatus],x$3: org.apache.hudi.common.util.Option[java.util.Map[String,String]])Boolean
    cannot be applied to (String, org.apache.spark.api.java.JavaRDD[org.apache.hudi.client.WriteStatus], org.apache.hudi.common.util.Option[java.util.HashMap[String,String]])
           client.commit(instantTime, writeStatuses,
   ```

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       Also, this being in `BuiltinKeyGenerator` and not `KeyGenerator` is a problem and will break all the custom key generators out there when they turn on row based writing, correct? should we move this up?

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();

Review comment:
       is thre a way to avoid using positions and use names instead?

##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -267,26 +258,26 @@ public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String
   }
 
   public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
-                                                       String instantTime) {
+      String instantTime) {
     return client.delete(hoodieKeys, instantTime);
   }
 
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
-                                                String payloadClass) throws IOException {
+      String payloadClass) throws IOException {

Review comment:
       another general rule of thumb. we could always review our own diffs again before submitting to make sure whitespace changes are all intentional. cc @nsivabalan . 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
+ * does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link InternalRow}.
+ */
+public class HoodieInternalRow extends InternalRow {
+
+  private String commitTime;
+  private String commitSeqNumber;
+  private String recordKey;
+  private String partitionPath;
+  private String fileName;
+  private InternalRow row;
+
+  public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
+      String fileName, InternalRow row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+  }
+
+  @Override
+  public int numFields() {
+    return row.numFields();
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = null;
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = null;
+          break;
+        }
+        case 2: {
+          this.recordKey = null;
+          break;
+        }
+        case 3: {
+          this.partitionPath = null;
+          break;
+        }
+        case 4: {
+          this.fileName = null;
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.setNullAt(i);
+    }
+  }
+
+  @Override
+  public void update(int i, Object value) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = value.toString();
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = value.toString();
+          break;
+        }
+        case 2: {
+          this.recordKey = value.toString();
+          break;
+        }
+        case 3: {
+          this.partitionPath = value.toString();
+          break;
+        }
+        case 4: {
+          this.fileName = value.toString();
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.update(i, value);
+    }
+  }
+
+  private String getHoodieColumnVal(int ordinal) {

Review comment:
       rename to `getMetaColumnVal` 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -411,11 +443,11 @@ private[hudi] object HoodieSparkSqlWriter {
 
       val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
       val compactionInstant : common.util.Option[java.lang.String] =
-      if (asyncCompactionEnabled) {
-        client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
-      } else {
-        common.util.Option.empty()
-      }
+        if (asyncCompactionEnabled) {

Review comment:
       these are the legit indentations. there is a PR open for scala style.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
##########
@@ -55,21 +51,22 @@ public SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, recordKeyField);
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));

Review comment:
       might be good to assert this out in the constructor itself

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -95,20 +95,20 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
       Option<Map<String, String>> extraMetadata) {
-    HoodieTableMetaClient metaClient = createMetaClient(false);
-    return commit(instantTime, writeStatuses, extraMetadata, metaClient.getCommitActionType());
+    List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStat(instantTime, stats, extraMetadata);
   }
 
-  private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata, String actionType) {
-
+  // fixme(bulkinsertv2) this name is ughh
+  public boolean commitStat(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {

Review comment:
       Looks like we cannot avoid a new public API. so might as well rename 

##########
File path: hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
##########
@@ -34,13 +36,28 @@ import org.scalatest.Assertions.fail
 class TestDataSourceDefaults {
 
   val schema = SchemaTestUtil.getComplexEvolvedSchema
+  val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)

Review comment:
       @nsivabalan can you please file a JIRA for this?

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -177,4 +191,26 @@ private long convertLongTimeToMillis(Long partitionVal) {
     }
     return MILLISECONDS.convert(partitionVal, timeUnit);
   }
+
+  @Override
+  public String getRecordKey(Row row) {
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRecordKeyPositions(), false);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    Object fieldVal = null;
+    Object partitionPathFieldVal =  RowKeyGeneratorHelper.getNestedFieldVal(row, getPartitionPathPositions().get(getPartitionPathFields().get(0)));

Review comment:
       is the .get(0) really fine?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -119,21 +119,19 @@ class DefaultSource extends RelationProvider
                               optParams: Map[String, String],
                               df: DataFrame): BaseRelation = {
     val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
-
     if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
       HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
     } else {
       HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
     }
-
     new HoodieEmptyRelation(sqlContext, df.schema)
   }
 
   override def createSink(sqlContext: SQLContext,
                           optParams: Map[String, String],
                           partitionColumns: Seq[String],
                           outputMode: OutputMode): Sink = {
-    val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
+    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)

Review comment:
       this needs to be reconciled with changes in the original method. oh my. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -670,7 +670,9 @@ public Builder withPath(String basePath) {
     }
 
     public Builder withSchema(String schemaStr) {
-      props.setProperty(AVRO_SCHEMA, schemaStr);
+      if (null != schemaStr) {

Review comment:
       @bvaradar  @nsivabalan why would this be null

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -129,45 +134,54 @@ public String getPartitionPath(GenericRecord record) {
     if (partitionVal == null) {
       partitionVal = 1L;
     }
+    try {
+      return getPartitionPath(partitionVal);
+    } catch (Exception e) {
+      throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e);
+    }
+  }
 
+  /**
+   * Parse and fetch partition path based on data type.
+   *
+   * @param partitionVal partition path object value fetched from record/row
+   * @return the parsed partition path based on data type
+   * @throws ParseException on any parse exception
+   */
+  private String getPartitionPath(Object partitionVal) throws ParseException {

Review comment:
       need to look at this line-by-line again and see if its all good. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468866921



##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
##########
@@ -51,4 +53,32 @@ protected KeyGenerator(TypedProperties config) {
     throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
         + "Please override this method in your custom key generator.");
   }
+
+  /**
+   * Initializes {@link KeyGenerator} for {@link Row} based operations.
+   * @param structType structype of the dataset.
+   * @param structName struct name of the dataset.
+   * @param recordNamespace record namespace of the dataset.
+   */
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {

Review comment:
       yes, we could do that. since HoodieDatasetBulkInsertHelper is the only class calls into getRecordKey(row) and getPartitionPath(Row), it should have access to structype and other args.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469296444



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,

Review comment:
       same here. please note that for recordKey, we throw an exception, where as for partitionpath, we might need to return DEFAULT_PARTITION_PATH.
   ```
        .forEach(f -> {
                 Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));	            if (structType.getFieldIndex(f).isDefined()) {
                 partitionPathPositions.put(f,
                     Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
               } else {
                 partitionPathPositions.put(f, Collections.singletonList(-1));
               }
             });
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468900813



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       guess, this could be a bug. just now realizing, we don't have tests for this. we have tests for all built in key generators, but not for this. Will get it done by tonight. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468900813



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       guess, this could be a bug. just now realizing, we don't have tests for this. we have tests for all built in key generators, but not for this. Will get it done by tonight. sorry to have missed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468879180



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
+ * does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link InternalRow}.
+ */
+public class HoodieInternalRow extends InternalRow {
+
+  private String commitTime;
+  private String commitSeqNumber;
+  private String recordKey;
+  private String partitionPath;
+  private String fileName;
+  private InternalRow row;
+
+  public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
+      String fileName, InternalRow row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+  }
+
+  @Override
+  public int numFields() {
+    return row.numFields();
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = null;
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = null;
+          break;
+        }
+        case 2: {
+          this.recordKey = null;
+          break;
+        }
+        case 3: {
+          this.partitionPath = null;
+          break;
+        }
+        case 4: {
+          this.fileName = null;
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.setNullAt(i);

Review comment:
       even I had the same doubt when I start reviewing this at first. thats why added some java docs for this class. row will have meta columns as well. just that meta columns will not be fetched from the row but from instance variables in this class. @bvaradar did some analysis before arriving at this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468901418



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       on 2nd thought, yes, it makes sense to move this to KeyGenerator. and thats why we had the default impl of re-using getRecord(). So that all existing customers can still leverage bulk insert w/ dataset. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468656221



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -105,6 +104,22 @@ private[hudi] object HoodieSparkSqlWriter {
     } else {
       // Handle various save modes
       handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
+      // Create the table if not present
+      if (!tableExists) {
+        val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
+          HoodieTableType.valueOf(tableType), tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
+          null.asInstanceOf[String])
+        tableConfig = tableMetaClient.getTableConfig
+      }
+
+      // short-circuit if bulk_insert via row is enabled.
+      // scalastyle:off
+      if (operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
+                                                                                basePath, path, instantTime)
+        return (success, commitTime, common.util.Option.of(""), hoodieWriteClient.orNull, tableConfig)

Review comment:
       nit: can you make the 3rd arg Option.empty. when I put up the PR, I got compilation issues and hence returned empty. I tested Option.empty locally with latest change and compilation seems to succeed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468884350



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -177,4 +191,26 @@ private long convertLongTimeToMillis(Long partitionVal) {
     }
     return MILLISECONDS.convert(partitionVal, timeUnit);
   }
+
+  @Override
+  public String getRecordKey(Row row) {
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRecordKeyPositions(), false);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    Object fieldVal = null;
+    Object partitionPathFieldVal =  RowKeyGeneratorHelper.getNestedFieldVal(row, getPartitionPathPositions().get(getPartitionPathFields().get(0)));

Review comment:
       yes, we have a special case for partition path. couldn't find a better way to do it. position will return -1 and when parsing for actual Row, we will return DEFAULT_PARTITION_PATH. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469401767



##########
File path: hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
##########
@@ -116,20 +165,26 @@ public void testScalar() throws IOException {
 
     // timezone is GMT
     properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
-    HoodieKey hk5 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
+    TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    HoodieKey hk5 = keyGen.getKey(baseRecord);
     assertEquals(hk5.getPartitionPath(), "2024-10-04 12");
+
+    // test w/ Row
+    baseRow = genericRecordToRow(baseRecord);
+    keyGen.initializeRowKeyGenerator(structType, testStructName, testNamespace);
+    assertEquals("2024-10-04 12", keyGen.getPartitionPathFromRow(baseRow));
   }
 
   @Test
   public void test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsUTC() throws IOException {
     baseRecord.put("createTime", "2020-04-01T13:01:33.428Z");
     properties = this.getBaseKeyConfig(
-      "DATE_STRING",
-      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
-      "",
-      "",
-      "yyyyMMddHH",
-      "GMT");
+        "DATE_STRING",

Review comment:
       @nsivabalan this is done? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468868307



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {

Review comment:
       responded elsewhere. we could move this to getRecordKey(Row) and getPartitionPath(Row) if need be. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468857201



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -670,7 +670,9 @@ public Builder withPath(String basePath) {
     }
 
     public Builder withSchema(String schemaStr) {
-      props.setProperty(AVRO_SCHEMA, schemaStr);
+      if (null != schemaStr) {

Review comment:
       For Bulk Insert V2, we are passing null in createHoodieConfig(...). May be we can change createHoodieConfig() to not call withSchema() for bulk insert V2.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469402167



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
##########
@@ -54,12 +51,17 @@ public String getPartitionPath(GenericRecord record) {
   }
 
   @Override
-  public List<String> getRecordKeyFields() {
-    return recordKeyFields;
+  public List<String> getPartitionPathFields() {
+    return new ArrayList<>();
   }
 
   @Override
-  public List<String> getPartitionPathFields() {
-    return new ArrayList<>();
+  public String getRecordKeyFromRow(Row row) {
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRecordKeyPositions(), true);
+  }
+
+  @Override
+  public String getPartitionPathFromRow(Row row) {

Review comment:
       follow up JIRA for 0.6.1 please :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468656221



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -105,6 +104,22 @@ private[hudi] object HoodieSparkSqlWriter {
     } else {
       // Handle various save modes
       handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
+      // Create the table if not present
+      if (!tableExists) {
+        val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
+          HoodieTableType.valueOf(tableType), tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
+          null.asInstanceOf[String])
+        tableConfig = tableMetaClient.getTableConfig
+      }
+
+      // short-circuit if bulk_insert via row is enabled.
+      // scalastyle:off
+      if (operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
+                                                                                basePath, path, instantTime)
+        return (success, commitTime, common.util.Option.of(""), hoodieWriteClient.orNull, tableConfig)

Review comment:
       nit: can you make the 3rd arg Option.empty. when I put up the PR, I got compilation issues and hence returned empty string. I tested Option.empty locally with latest change and compilation seems to succeed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468882653



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       When I was doing the rebase, I saw getRecordKeyFieldNames in KeyGenerator was throwing UnsupportedOperationException. Hence went with the same for these methods too. So didn't want to move this w/o consulting w/ you. 

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       When I was doing the rebase, I saw getRecordKeyFieldNames in KeyGenerator was throwing UnsupportedOperationException. Hence went with the same for these methods too. Before rebase, we had this in KeyGenerator only. So didn't want to move this w/o consulting w/ you. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468934922



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieRowParquetWriteSupport.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing Row to Parquet.
+ */
+public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
+
+  private Configuration hadoopConf;
+  private BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter) {
+    super();
+    Configuration hadoopConf = new Configuration(conf);
+    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "false");

Review comment:
       Check lines 94 to 104 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala . Or was your ask just about hardcoding these configs. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469484840



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -297,8 +298,9 @@ object DataSourceWriteOptions {
   val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false"
   val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
   val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
-
   // Async Compaction - Enabled by default for MOR
   val ASYNC_COMPACT_ENABLE_KEY = "hoodie.datasource.compaction.async.enable"
   val DEFAULT_ASYNC_COMPACT_ENABLE_VAL = "true"
+  // Internal configs

Review comment:
       also renaming `ASYNC_COMPACT_ENABLE_KEY` in the process




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469922118



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -85,71 +84,40 @@ public final HoodieKey getKey(GenericRecord record) {
     }).collect(Collectors.toList());
   }
 
-  @Override
-  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
-    // parse simple feilds
-    getRecordKeyFields().stream()
-        .filter(f -> !(f.contains(".")))
-        .forEach(f -> {
-          if (structType.getFieldIndex(f).isDefined()) {
-            recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
-          } else {
-            throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
-          }
-        });
-    // parse nested fields
-    getRecordKeyFields().stream()
-        .filter(f -> f.contains("."))
-        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
-    // parse simple fields
-    if (getPartitionPathFields() != null) {
-      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+  void buildFieldPositionMapIfNeeded(StructType structType) {
+    if (this.structType == null) {
+      // parse simple fields
+      getRecordKeyFields().stream()
+          .filter(f -> !(f.contains(".")))
           .forEach(f -> {
             if (structType.getFieldIndex(f).isDefined()) {
-              partitionPathPositions.put(f,
-                  Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
             } else {
-              partitionPathPositions.put(f, Collections.singletonList(-1));
+              throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
             }
           });
       // parse nested fields
-      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
-          .forEach(f -> partitionPathPositions.put(f,
-              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
-    }
-    this.structName = structName;
-    this.structType = structType;
-    this.recordNamespace = recordNamespace;
-  }
-
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
-  @Override
-  public String getRecordKey(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionHelper.createConverterToAvro(structType, structName, recordNamespace);
+      getRecordKeyFields().stream()
+          .filter(f -> f.contains("."))
+          .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+      // parse simple fields
+      if (getPartitionPathFields() != null) {
+        getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+            .forEach(f -> {
+              if (structType.getFieldIndex(f).isDefined()) {
+                partitionPathPositions.put(f,
+                    Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              } else {
+                partitionPathPositions.put(f, Collections.singletonList(-1));
+              }
+            });
+        // parse nested fields
+        getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+            .forEach(f -> partitionPathPositions.put(f,
+                RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+      }
+      this.structType = structType;

Review comment:
       may I know where is the structType being used ? AvroConversionHelper.createConverterToAvro used row.Schema() and so we may not need it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#issuecomment-671892573


   https://github.com/apache/hudi/pull/1834#discussion_r461939866
   : bcoz, this is for Row where as existing WriteStats is for HoodieRecords. Guess we should have templatized this too.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r466385455



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with InternalRow for datasource implemention of bulk insert.
+ */
+public class HoodieRowCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(HoodieRowCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
+      String instantTime, int taskPartitionId, long taskId, long taskEpochId,
+      StructType structType) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
+   * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    try {
+      String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
+      String recordKey = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.RECORD_KEY_METADATA_FIELD)).toString();
+      HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, seqId, recordKey, partitionPath, path.getName(),
+          record);
+      try {
+        fileWriter.writeRow(recordKey, internalRow);
+        writeStatus.markSuccess(recordKey);
+      } catch (Throwable t) {
+        writeStatus.markFailure(recordKey, t);
+      }
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;
+    }
+  }
+
+  /**
+   * @returns {@code true} if this handle can take in more writes. else {@code false}.
+   */
+  public boolean canWrite() {
+    return fileWriter.canWrite();
+  }
+
+  /**
+   * Closes the {@link HoodieRowCreateHandle} and returns an instance of {@link HoodieInternalWriteStatus} containing the stats and
+   * status of the writes to this handle.
+   * @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle.
+   * @throws IOException
+   */
+  public HoodieInternalWriteStatus close() throws IOException {
+    fileWriter.close();
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(partitionPath);
+    stat.setNumWrites(writeStatus.getTotalRecords());
+    stat.setNumDeletes(0);
+    stat.setNumInserts(writeStatus.getTotalRecords());
+    stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+    stat.setFileId(fileId);
+    stat.setPath(new Path(writeConfig.getBasePath()), path);
+    long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getFs(), path);
+    stat.setTotalWriteBytes(fileSizeInBytes);
+    stat.setFileSizeInBytes(fileSizeInBytes);
+    stat.setTotalWriteErrors(writeStatus.getFailedRowsSize());
+    HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
+    runtimeStats.setTotalCreateTime(currTimer.endTimer());
+    stat.setRuntimeStats(runtimeStats);
+    writeStatus.setStat(stat);
+    return writeStatus;
+  }
+
+  public String getFileName() {
+    return path.getName();
+  }
+
+  private Path makeNewPath(String partitionPath) {
+    Path path = FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath);
+    try {
+      fs.mkdirs(path); // create a new partition as needed.
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to make dir " + path, e);
+    }
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, getWriteToken(), fileId,
+        tableConfig.getBaseFileFormat().getFileExtension()));
+  }
+
+  /**
+   * Creates an empty marker file corresponding to storage writer path.
+   *
+   * @param partitionPath Partition path
+   */
+  private void createMarkerFile(String partitionPath, String dataFileName) {

Review comment:
       Note to reviewer: these methods are copied from HoodieWriteHandle for now. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with InternalRow for datasource implemention of bulk insert.
+ */
+public class HoodieRowCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(HoodieRowCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
+      String instantTime, int taskPartitionId, long taskId, long taskEpochId,
+      StructType structType) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
+   * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    try {
+      String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(

Review comment:
       Note to reviewer: if this statement fails, we had to consider it as global error and not as per record error since we don't have record key yet. This is different from how HoodieRecord write happens. So, in these cases, rowCreateHandle will throw exception and caller is expected to close the rowCreateHandle.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {

Review comment:
       Note to reviewer: had to move this to hudi-spark as we need to access AvroConversionUtils for Row to GenericRecord converter function.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -129,45 +134,54 @@ public String getPartitionPath(GenericRecord record) {
     if (partitionVal == null) {
       partitionVal = 1L;
     }
+    try {
+      return getPartitionPath(partitionVal);
+    } catch (Exception e) {
+      throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e);
+    }
+  }
 
+  /**
+   * Parse and fetch partition path based on data type.
+   *
+   * @param partitionVal partition path object value fetched from record/row
+   * @return the parsed partition path based on data type
+   * @throws ParseException on any parse exception
+   */
+  private String getPartitionPath(Object partitionVal) throws ParseException {

Review comment:
       Note to reviewer: no changes here. just moved code to a private method for re-use

##########
File path: hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
##########
@@ -116,20 +165,26 @@ public void testScalar() throws IOException {
 
     // timezone is GMT
     properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
-    HoodieKey hk5 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
+    TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    HoodieKey hk5 = keyGen.getKey(baseRecord);
     assertEquals(hk5.getPartitionPath(), "2024-10-04 12");
+
+    // test w/ Row
+    baseRow = genericRecordToRow(baseRecord);
+    keyGen.initializeRowKeyGenerator(structType, testStructName, testNamespace);
+    assertEquals("2024-10-04 12", keyGen.getPartitionPathFromRow(baseRow));
   }
 
   @Test
   public void test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsUTC() throws IOException {
     baseRecord.put("createTime", "2020-04-01T13:01:33.428Z");
     properties = this.getBaseKeyConfig(
-      "DATE_STRING",
-      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
-      "",
-      "",
-      "yyyyMMddHH",
-      "GMT");
+        "DATE_STRING",

Review comment:
       Note to reviewer: I am yet to add tests to these new methods. Got these as part of rebase. Also, I notice few other test classes for each key generators after rebasing. Will add tests by tmrw to those new test classes. 

##########
File path: hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
##########
@@ -34,13 +36,28 @@ import org.scalatest.Assertions.fail
 class TestDataSourceDefaults {
 
   val schema = SchemaTestUtil.getComplexEvolvedSchema
+  val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)

Review comment:
       Note to reviewer: this is the test class where all key generators are tested for Row apis as well. Found new test classes for each key gen after rebasing. Yet to add tests to these new key gen test classes for Row based apis.

##########
File path: hudi-client/src/test/java/org/apache/hudi/testutils/HoodieDatasetTestUtils.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
+
+/**
+ * Dataset test utils.
+ */
+public class HoodieDatasetTestUtils {
+
+  public static final StructType STRUCT_TYPE = new StructType(new StructField[] {

Review comment:
       Note to reviewer: Can't leverage HoodieTestDataGenerator since each record is expected to be in certain format (meta columns followed by data columns). Hence introduced a new schema for testing "bulk insert dataset" 

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Base class for all the built-in key generators. Contains methods structured for
- * code reuse amongst them.
- */
-public abstract class BuiltinKeyGenerator extends KeyGenerator {

Review comment:
       Note to reviewer: moved this file to hudi-spark 

##########
File path: hudi-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.STRUCT_TYPE;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getConfigBuilder;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getInternalRowWithError;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.toInternalRows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Unit tests {@link HoodieRowCreateHandle}.
+ */
+public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
+
+  private static final Random RANDOM = new Random();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestHoodieRowCreateHandle");
+    initPath();
+    initFileSystem();
+    initTestDataGenerator();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testRowCreateHandle() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
+    List<String> fileNames = new ArrayList<>();
+    List<String> fileAbsPaths = new ArrayList<>();
+
+    Dataset<Row> totalInputRows = null;
+    // one round per partition
+    for (int i = 0; i < 5; i++) {
+      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3];
+
+      // init some args
+      String fileId = UUID.randomUUID().toString();
+      String instantTime = "000";
+
+      HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE);
+      int size = 10 + RANDOM.nextInt(1000);
+      // Generate inputs
+      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+      if (totalInputRows == null) {
+        totalInputRows = inputRows;
+      } else {
+        totalInputRows = totalInputRows.union(inputRows);
+      }
+
+      // issue writes
+      HoodieInternalWriteStatus writeStatus = writeAndGetWriteStatus(inputRows, handle);
+
+      fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
+      fileNames.add(handle.getFileName());
+      // verify output
+      assertOutput(writeStatus, size, fileId, partitionPath, instantTime, totalInputRows, fileNames, fileAbsPaths);
+    }
+  }
+
+  /**
+   * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch 2 of invalid records Global Error
+   * should be thrown.
+   */
+  @Test
+  public void testGlobalFailure() throws IOException {

Review comment:
       Note to reviewer: as mentioned above, if there is some error parsing partition path or record key, it will result in global error for the handle and not per record/row error. 
   I couldn't repro/test per record error. I tried writing a different datatype to one of the data column expecting the write to fail, but it didn't fail. So, as of now, there are no tests for per record failures. Same applies to RowFileWriter, InternalWriter etc. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
##########
@@ -51,4 +53,32 @@ protected KeyGenerator(TypedProperties config) {
     throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
         + "Please override this method in your custom key generator.");
   }
+
+  /**
+   * Initializes {@link KeyGenerator} for {@link Row} based operations.
+   * @param structType structype of the dataset.
+   * @param structName struct name of the dataset.
+   * @param recordNamespace record namespace of the dataset.
+   */
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {

Review comment:
       Note to reviewer: introduced these new apis for Row based KeyGen. All Built in generators have implemented these. If any user has custom key generator, they don't need to implement these apis if not for "bulk_insert_dataset". But if they wish to use "bulk_insert_dataset", they might have to give implementations to these methods.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+/**
+ * Helper class to fetch fields from Row.
+ */
+public class RowKeyGeneratorHelper {
+
+  /**
+   * Generates record key for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param recordKeyFields record key fields as a list
+   * @param recordKeyPositions record key positions for the corresponding record keys in {@code recordKeyFields}
+   * @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise.
+   * @return the record key thus generated
+   */
+  public static String getRecordKeyFromRow(Row row, List<String> recordKeyFields, Map<String, List<Integer>> recordKeyPositions, boolean prefixFieldName) {
+    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
+    String toReturn = IntStream.range(0, recordKeyFields.size()).mapToObj(idx -> {
+      String field = recordKeyFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = recordKeyPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple field
+        Integer fieldPos = fieldPositions.get(0);
+        if (row.isNullAt(fieldPos)) {
+          val = NULL_RECORDKEY_PLACEHOLDER;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = EMPTY_RECORDKEY_PLACEHOLDER;
+          } else {
+            keyIsNullOrEmpty.set(false);
+          }
+        }
+      } else { // nested fields
+        val = getNestedFieldVal(row, recordKeyPositions.get(field)).toString();
+        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          keyIsNullOrEmpty.set(false);
+        }
+      }
+      return prefixFieldName ? (field + ":" + val) : val;
+    }).collect(Collectors.joining(","));
+    if (keyIsNullOrEmpty.get()) {
+      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generates partition path for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param partitionPathFields partition path fields as a list
+   * @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise
+   * @param partitionPathPositions partition path positions for the corresponding fields in {@code partitionPathFields}
+   * @return the generated partition path for the row
+   */
+  public static String getPartitionPathFromRow(Row row, List<String> partitionPathFields, boolean hiveStylePartitioning, Map<String, List<Integer>> partitionPathPositions) {
+    return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
+      String field = partitionPathFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = partitionPathPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple
+        Integer fieldPos = fieldPositions.get(0);
+        // for partition path, if field is not found, index will be set to -1
+        if (fieldPos == -1 || row.isNullAt(fieldPos)) {
+          val = DEFAULT_PARTITION_PATH;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = DEFAULT_PARTITION_PATH;
+          }
+        }
+        if (hiveStylePartitioning) {
+          val = field + "=" + val;
+        }
+      } else { // nested
+        Object nestedVal = getNestedFieldVal(row, partitionPathPositions.get(field));
+        if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          val = hiveStylePartitioning ? field + "=" + DEFAULT_PARTITION_PATH : DEFAULT_PARTITION_PATH;
+        } else {
+          val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : nestedVal.toString();
+        }
+      }
+      return val;
+    }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
+  }
+
+  /**
+   * Fetch the field value located at the positions requested for.
+   * @param row instance of {@link Row} of interest
+   * @param positions tree style positions where the leaf node need to be fetched and returned
+   * @return the field value as per the positions requested for.
+   */
+  public static Object getNestedFieldVal(Row row, List<Integer> positions) {
+    if (positions.size() == 1 && positions.get(0) == -1) {

Review comment:
       Note to reviewer: getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) will return -1 for partitionPathIndices if partition path field is not found.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  private List<String> recordKeyFields;

Review comment:
       Note to reviewer: Have unified code across Simple and Complex key gens. 

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+/**
+ * Helper class to fetch fields from Row.
+ */
+public class RowKeyGeneratorHelper {
+
+  /**
+   * Generates record key for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param recordKeyFields record key fields as a list
+   * @param recordKeyPositions record key positions for the corresponding record keys in {@code recordKeyFields}
+   * @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise.
+   * @return the record key thus generated
+   */
+  public static String getRecordKeyFromRow(Row row, List<String> recordKeyFields, Map<String, List<Integer>> recordKeyPositions, boolean prefixFieldName) {
+    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
+    String toReturn = IntStream.range(0, recordKeyFields.size()).mapToObj(idx -> {
+      String field = recordKeyFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = recordKeyPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple field
+        Integer fieldPos = fieldPositions.get(0);
+        if (row.isNullAt(fieldPos)) {
+          val = NULL_RECORDKEY_PLACEHOLDER;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = EMPTY_RECORDKEY_PLACEHOLDER;
+          } else {
+            keyIsNullOrEmpty.set(false);
+          }
+        }
+      } else { // nested fields
+        val = getNestedFieldVal(row, recordKeyPositions.get(field)).toString();
+        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          keyIsNullOrEmpty.set(false);
+        }
+      }
+      return prefixFieldName ? (field + ":" + val) : val;
+    }).collect(Collectors.joining(","));
+    if (keyIsNullOrEmpty.get()) {
+      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generates partition path for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param partitionPathFields partition path fields as a list
+   * @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise
+   * @param partitionPathPositions partition path positions for the corresponding fields in {@code partitionPathFields}
+   * @return the generated partition path for the row
+   */
+  public static String getPartitionPathFromRow(Row row, List<String> partitionPathFields, boolean hiveStylePartitioning, Map<String, List<Integer>> partitionPathPositions) {
+    return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
+      String field = partitionPathFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = partitionPathPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple
+        Integer fieldPos = fieldPositions.get(0);
+        // for partition path, if field is not found, index will be set to -1
+        if (fieldPos == -1 || row.isNullAt(fieldPos)) {
+          val = DEFAULT_PARTITION_PATH;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = DEFAULT_PARTITION_PATH;
+          }
+        }
+        if (hiveStylePartitioning) {
+          val = field + "=" + val;
+        }
+      } else { // nested
+        Object nestedVal = getNestedFieldVal(row, partitionPathPositions.get(field));
+        if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          val = hiveStylePartitioning ? field + "=" + DEFAULT_PARTITION_PATH : DEFAULT_PARTITION_PATH;
+        } else {
+          val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : nestedVal.toString();
+        }
+      }
+      return val;
+    }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
+  }
+
+  /**
+   * Fetch the field value located at the positions requested for.
+   * @param row instance of {@link Row} of interest
+   * @param positions tree style positions where the leaf node need to be fetched and returned
+   * @return the field value as per the positions requested for.
+   */
+  public static Object getNestedFieldVal(Row row, List<Integer> positions) {
+    if (positions.size() == 1 && positions.get(0) == -1) {
+      return DEFAULT_PARTITION_PATH;
+    }
+    int index = 0;
+    int totalCount = positions.size();
+    Row valueToProcess = row;
+    Object toReturn = null;
+
+    while (index < totalCount) {
+      if (index < totalCount - 1) {
+        if (valueToProcess.isNullAt(positions.get(index))) {
+          toReturn = NULL_RECORDKEY_PLACEHOLDER;
+          break;
+        }
+        valueToProcess = (Row) valueToProcess.get(positions.get(index));
+      } else { // last index
+        if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
+          toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
+          break;
+        }
+        toReturn = valueToProcess.getAs(positions.get(index));
+      }
+      index++;
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generate the tree style positions for the field requested for as per the defined struct type.
+   * @param structType schema of interest
+   * @param field field of interest for which the positions are requested for
+   * @param isRecordKey {@code true} if the field requested for is a record key. {@code false} incase of a partition path.
+   * @return the positions of the field as per the struct type.
+   */
+  public static List<Integer> getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) {
+    String[] slices = field.split("\\.");
+    List<Integer> positions = new ArrayList<>();
+    int index = 0;
+    int totalCount = slices.length;
+    while (index < totalCount) {
+      String slice = slices[index];
+      Option<Object> curIndexOpt = structType.getFieldIndex(slice);
+      if (curIndexOpt.isDefined()) {
+        int curIndex = (int) curIndexOpt.get();
+        positions.add(curIndex);
+        final StructField nestedField = structType.fields()[curIndex];
+        if (index < totalCount - 1) {
+          if (!(nestedField.dataType() instanceof StructType)) {
+            if (isRecordKey) {
+              throw new HoodieKeyException("Nested field should be of type StructType " + nestedField);
+            } else {
+              positions = Collections.singletonList(-1);

Review comment:
       Note to reviewer: returning -1 only in case  of partition path. So, that  getNestedFieldVal(Row row, List<Integer> positions) will return DEFAULT_PARTITION_PATH if partition path field is not found. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Hoodie's internal write status used in datasource implementation of bulk insert.
+ */
+public class HoodieInternalWriteStatus implements Serializable {

Review comment:
       will address all feedback together. 

##########
File path: hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.STRUCT_TYPE;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getConfigBuilder;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.toInternalRows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Unit tests {@link HoodieDataSourceInternalWriter}.
+ */
+public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness {
+
+  private static final Random RANDOM = new Random();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestHoodieDataSourceInternalWriter");
+    initPath();
+    initFileSystem();
+    initTestDataGenerator();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testDataSourceWriter() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    String instantTime = "001";
+    // init writer
+    HoodieDataSourceInternalWriter dataSourceInternalWriter =
+        new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+    DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());
+
+    List<String> partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
+    List<String> partitionPathsAbs = new ArrayList<>();
+    for (String partitionPath : partitionPaths) {
+      partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
+    }
+
+    int size = 10 + RANDOM.nextInt(1000);
+    int batches = 5;
+    Dataset<Row> totalInputRows = null;
+
+    for (int j = 0; j < batches; j++) {
+      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+      writeRows(inputRows, writer);
+      if (totalInputRows == null) {
+        totalInputRows = inputRows;
+      } else {
+        totalInputRows = totalInputRows.union(inputRows);
+      }
+    }
+
+    HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+    List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+    commitMessages.add(commitMetadata);
+    dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+    metaClient.reloadActiveTimeline();
+    Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
+    // verify output
+    assertOutput(totalInputRows, result, instantTime);
+    assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+  }
+
+  @Test
+  public void testMultipleDataSourceWrites() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    int partitionCounter = 0;
+
+    // execute N rounds
+    for (int i = 0; i < 5; i++) {
+      String instantTime = "00" + i;
+      // init writer
+      HoodieDataSourceInternalWriter dataSourceInternalWriter =
+          new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+
+      List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+      Dataset<Row> totalInputRows = null;
+      DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong());
+
+      int size = 10 + RANDOM.nextInt(1000);
+      int batches = 5; // one batch per partition
+
+      for (int j = 0; j < batches; j++) {
+        String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+        Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+        writeRows(inputRows, writer);
+        if (totalInputRows == null) {
+          totalInputRows = inputRows;
+        } else {
+          totalInputRows = totalInputRows.union(inputRows);
+        }
+      }
+
+      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+      commitMessages.add(commitMetadata);
+      dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+      metaClient.reloadActiveTimeline();
+
+      Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
+
+      // verify output
+      assertOutput(totalInputRows, result, instantTime);
+      assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+    }
+  }
+
+  @Test
+  public void testLargeWrites() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    int partitionCounter = 0;
+
+    // execute N rounds
+    for (int i = 0; i < 3; i++) {
+      String instantTime = "00" + i;
+      // init writer
+      HoodieDataSourceInternalWriter dataSourceInternalWriter =
+          new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+
+      List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+      Dataset<Row> totalInputRows = null;
+      DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong());
+
+      int size = 10000 + RANDOM.nextInt(10000);
+      int batches = 3; // one batch per partition
+
+      for (int j = 0; j < batches; j++) {
+        String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+        Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+        writeRows(inputRows, writer);
+        if (totalInputRows == null) {
+          totalInputRows = inputRows;
+        } else {
+          totalInputRows = totalInputRows.union(inputRows);
+        }
+      }
+
+      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+      commitMessages.add(commitMetadata);
+      dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+      metaClient.reloadActiveTimeline();
+
+      Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
+
+      // verify output
+      assertOutput(totalInputRows, result, instantTime);
+      assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+    }
+  }
+
+  /**
+   * Tests that DataSourceWriter.abort() will abort the written records of interest write and commit batch1 write and abort batch2 Read of entire dataset should show only records from batch1.
+   * commit batch1
+   * abort batch2
+   * verify only records from batch1 is available to read
+   */
+  @Test
+  public void testAbort() throws IOException {

Review comment:
       Note to reviewer: here is the only place where we test abort for Datasource path. We couldn't test it elsewhere (TestHoodieRowCreateHandle, TestHoodieInternalRowParquetWriter, TestHoodieBulkInsertDataInternalWriter)

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -108,262 +106,280 @@ private[hudi] object HoodieSparkSqlWriter {
           throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
         }
       }
-      val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-          // register classes & schemas
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
-          val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-          sparkContext.getConf.registerAvroSchemas(schema)
-          log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-          // Convert to RDD[HoodieRecord]
-          val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
-          }).toJavaRDD()
-
-          // Handle various save modes
-          if (mode == SaveMode.ErrorIfExists && exists) {
-            throw new HoodieException(s"hoodie table at $basePath already exists.")
-          }
 
-          if (mode == SaveMode.Overwrite && exists) {
-            log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
-            fs.delete(basePath, true)
-            exists = false
-          }
+      val (writeSuccessfulRetVal: Boolean, commitTimeRetVal: common.util.Option[String], compactionInstantRetVal: common.util.Option[String],
+      writeClientRetVal: HoodieWriteClient[HoodieRecordPayload[Nothing]], tableConfigRetVal: HoodieTableConfig) =
+         if (operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        // register classes & schemas
+        val structName = s"${tblName}_record"
+        val nameSpace = s"hoodie.${tblName}"
 
-          // Create the table if not present
-          if (!exists) {
-            //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
-            val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
-              path.get, HoodieTableType.valueOf(tableType),
-              tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
-            tableConfig = tableMetaClient.getTableConfig
-          }
+        // Handle various save modes
+        if (mode == SaveMode.ErrorIfExists && exists) {
+          throw new HoodieException(s"hoodie table at $basePath already exists.")
+        }
 
-          // Create a HoodieWriteClient & issue the write.
-          val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
-            tblName, mapAsJavaMap(parameters)
-          )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+        val (success, commitTime: common.util.Option[String]) =
+          if (mode == SaveMode.Ignore && exists) {
+            log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+            (false, common.util.Option.ofNullable(instantTime))
+          } else {
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
+              val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val hoodieRecords =
-            if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-              DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
+            val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName,
+              mapAsJavaMap(parameters))
+
+            val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace)
+            hoodieDF.write.format("org.apache.hudi.internal").option(INSTANT_TIME, instantTime)
+              .options(parameters).save()
+            val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+            val syncHiveSucess = if (hiveSyncEnabled) {
+              log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+              val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+              syncHive(basePath, fs, parameters)
             } else {
-              hoodieAllIncomingRecords
+              true
             }
-
-          if (hoodieRecords.isEmpty()) {
-            log.info("new batch has no new records, skipping...")
-            (true, common.util.Option.empty())
+            (syncHiveSucess, common.util.Option.ofNullable(instantTime))
           }
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
-          (writeStatuses, client)
-        } else {
+        (success, commitTime, common.util.Option.of(""), hoodieWriteClient.orNull, tableConfig)
+       } else {
+        val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =

Review comment:
       Note to reviewer: no changes in the else section which is same as before for all write operations. Github does not show the difference well. Anyways, if possible just ensure there are no change as I had to manually resolve lot of conflicts during rebase. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469399755



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.TypedProperties
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+ * WriterUtils to assist in write path in Datasource and tests.
+ */
+object HoodieWriterUtils {
+
+  def javaParametersWithWriteDefaults(parameters: java.util.Map[String, String]): java.util.Map[String, String] = {
+    mapAsJavaMap(parametersWithWriteDefaults(parameters.asScala.toMap))
+  }
+
+  /**
+    * Add default options for unspecified write options keys.
+    *
+    * @param parameters
+    * @return
+    */
+  def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
+    Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
+      TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
+      PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
+      PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
+      RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
+      PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
+      KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
+      COMMIT_METADATA_KEYPREFIX_OPT_KEY -> DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
+      INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
+      STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
+      STREAMING_RETRY_INTERVAL_MS_OPT_KEY -> DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL,
+      STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL,
+      META_SYNC_CLIENT_TOOL_CLASS -> DEFAULT_META_SYNC_CLIENT_TOOL_CLASS,
+      HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL,
+      META_SYNC_ENABLED_OPT_KEY -> DEFAULT_META_SYNC_ENABLED_OPT_VAL,
+      HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL,
+      HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL,
+      HIVE_BASE_FILE_FORMAT_OPT_KEY -> DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL,
+      HIVE_USER_OPT_KEY -> DEFAULT_HIVE_USER_OPT_VAL,
+      HIVE_PASS_OPT_KEY -> DEFAULT_HIVE_PASS_OPT_VAL,
+      HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
+      HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
+      HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
+      HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL,
+      HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL,
+      ASYNC_COMPACT_ENABLE_KEY -> DEFAULT_ASYNC_COMPACT_ENABLE_VAL

Review comment:
       I added this from `HoodieSparkSQLWriter`so we can have.just one method 

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieRowParquetWriteSupport.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing Row to Parquet.
+ */
+public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
+
+  private Configuration hadoopConf;
+  private BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter) {
+    super();
+    Configuration hadoopConf = new Configuration(conf);
+    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "false");

Review comment:
       yes. why we are hardcoding this. any ideas @bvaradar ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
##########
@@ -35,6 +35,7 @@
   // bulk insert
   BULK_INSERT("bulk_insert"),
   BULK_INSERT_PREPPED("bulk_insert_prepped"),
+  BULK_INSERT_DATASET("bulk_insert_dataset"),

Review comment:
       removing this. it was easy enough. 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -297,8 +298,9 @@ object DataSourceWriteOptions {
   val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false"
   val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
   val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
-
   // Async Compaction - Enabled by default for MOR
   val ASYNC_COMPACT_ENABLE_KEY = "hoodie.datasource.compaction.async.enable"
   val DEFAULT_ASYNC_COMPACT_ENABLE_VAL = "true"
+  // Internal configs

Review comment:
       this is publicly visible. cannot add this here. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
+ * does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link InternalRow}.
+ */
+public class HoodieInternalRow extends InternalRow {
+
+  private String commitTime;
+  private String commitSeqNumber;
+  private String recordKey;
+  private String partitionPath;
+  private String fileName;
+  private InternalRow row;
+
+  public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
+      String fileName, InternalRow row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+  }
+
+  @Override
+  public int numFields() {
+    return row.numFields();
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = null;
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = null;
+          break;
+        }
+        case 2: {
+          this.recordKey = null;
+          break;
+        }
+        case 3: {
+          this.partitionPath = null;
+          break;
+        }
+        case 4: {
+          this.fileName = null;
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.setNullAt(i);

Review comment:
       I think this is because row already has these metafields per se in the schema

##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -670,7 +670,9 @@ public Builder withPath(String basePath) {
     }
 
     public Builder withSchema(String schemaStr) {
-      props.setProperty(AVRO_SCHEMA, schemaStr);
+      if (null != schemaStr) {

Review comment:
       I did that. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468975269



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -125,49 +130,58 @@ public TimestampBasedKeyGenerator(TypedProperties config, String partitionPathFi
 
   @Override
   public String getPartitionPath(GenericRecord record) {
-    Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, partitionPathField, true);
+    Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true);
     if (partitionVal == null) {
       partitionVal = 1L;
     }
+    try {
+      return getPartitionPath(partitionVal);
+    } catch (Exception e) {
+      throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e);
+    }
+  }
 
+  /**
+   * Parse and fetch partition path based on data type.
+   *
+   * @param partitionVal partition path object value fetched from record/row
+   * @return the parsed partition path based on data type
+   * @throws ParseException on any parse exception
+   */
+  private String getPartitionPath(Object partitionVal) throws ParseException {
     DateTimeFormatter partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
     if (this.outputDateTimeZone != null) {
       partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
     }
-
-    try {
-      long timeMs;
-      if (partitionVal instanceof Double) {
-        timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
-      } else if (partitionVal instanceof Float) {
-        timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
-      } else if (partitionVal instanceof Long) {
-        timeMs = convertLongTimeToMillis((Long) partitionVal);
-      } else if (partitionVal instanceof CharSequence) {
-        DateTime parsedDateTime = inputFormatter.parseDateTime(partitionVal.toString());
-        if (this.outputDateTimeZone == null) {
-          // Use the timezone that came off the date that was passed in, if it had one
-          partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
-        }
-
-        timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis();
-      } else {
-        throw new HoodieNotSupportedException(
-            "Unexpected type for partition field: " + partitionVal.getClass().getName());
+    long timeMs;

Review comment:
       note to reviewer: removed the outer try catch and moved it to the caller. Except that, no other code changes.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
##########
@@ -22,30 +22,27 @@
 import org.apache.hudi.common.config.TypedProperties;
 
 import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
- * Key generator for deletes using global indices. Global index deletes do not require partition value
- * so this key generator avoids using partition value for generating HoodieKey.
+ * Key generator for deletes using global indices. Global index deletes do not require partition value so this key generator avoids using partition value for generating HoodieKey.
  */
 public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
 
   private static final String EMPTY_PARTITION = "";
 
-  protected final List<String> recordKeyFields;
-
   public GlobalDeleteKeyGenerator(TypedProperties config) {
     super(config);
-    this.recordKeyFields = Arrays.stream(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
+    this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));

Review comment:
       This line is different from what I see before this patch. It is an optimization, but just to be safe, we can keep it as is. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469484328



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -297,8 +298,9 @@ object DataSourceWriteOptions {
   val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false"
   val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
   val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
-
   // Async Compaction - Enabled by default for MOR
   val ASYNC_COMPACT_ENABLE_KEY = "hoodie.datasource.compaction.async.enable"
   val DEFAULT_ASYNC_COMPACT_ENABLE_VAL = "true"
+  // Internal configs

Review comment:
       cc @bvaradar @nsivabalan let us think very proactively about adding anything to public classes :) . Also I notice `INSTANT_TIME` does not follow the key naming conventions in the rest of the configs. these little things add up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468885195



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieRowParquetWriteSupport.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing Row to Parquet.
+ */
+public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
+
+  private Configuration hadoopConf;
+  private BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter) {
+    super();
+    Configuration hadoopConf = new Configuration(conf);
+    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "false");

Review comment:
       Nope. we need to fix this. The built in ParquetWriteSupport expects these two params to be set. I will double check once again to ensure this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468885750



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       hmmm, not sure on this. I will reconcile w/ Balaji on this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468883660



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
##########
@@ -55,21 +51,22 @@ public SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, recordKeyField);
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));

Review comment:
       We wanted to have the same behavior as getKey(). we don't throw exception in constructor if record key is not found. we throw only when getKey(GenericRecord record) is called. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469922118



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -85,71 +84,40 @@ public final HoodieKey getKey(GenericRecord record) {
     }).collect(Collectors.toList());
   }
 
-  @Override
-  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {
-    // parse simple feilds
-    getRecordKeyFields().stream()
-        .filter(f -> !(f.contains(".")))
-        .forEach(f -> {
-          if (structType.getFieldIndex(f).isDefined()) {
-            recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
-          } else {
-            throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
-          }
-        });
-    // parse nested fields
-    getRecordKeyFields().stream()
-        .filter(f -> f.contains("."))
-        .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
-    // parse simple fields
-    if (getPartitionPathFields() != null) {
-      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+  void buildFieldPositionMapIfNeeded(StructType structType) {
+    if (this.structType == null) {
+      // parse simple fields
+      getRecordKeyFields().stream()
+          .filter(f -> !(f.contains(".")))
           .forEach(f -> {
             if (structType.getFieldIndex(f).isDefined()) {
-              partitionPathPositions.put(f,
-                  Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
             } else {
-              partitionPathPositions.put(f, Collections.singletonList(-1));
+              throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
             }
           });
       // parse nested fields
-      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
-          .forEach(f -> partitionPathPositions.put(f,
-              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
-    }
-    this.structName = structName;
-    this.structType = structType;
-    this.recordNamespace = recordNamespace;
-  }
-
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
-  @Override
-  public String getRecordKey(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionHelper.createConverterToAvro(structType, structName, recordNamespace);
+      getRecordKeyFields().stream()
+          .filter(f -> f.contains("."))
+          .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+      // parse simple fields
+      if (getPartitionPathFields() != null) {
+        getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+            .forEach(f -> {
+              if (structType.getFieldIndex(f).isDefined()) {
+                partitionPathPositions.put(f,
+                    Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              } else {
+                partitionPathPositions.put(f, Collections.singletonList(-1));
+              }
+            });
+        // parse nested fields
+        getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
+            .forEach(f -> partitionPathPositions.put(f,
+                RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+      }
+      this.structType = structType;

Review comment:
       may I know where is the structType being used ? AvroConversionHelper.createConverterToAvro used row.Schema() and so we may not need it. probably we should rename this to boolean positionMapInitialized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468883660



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
##########
@@ -55,21 +51,22 @@ public SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, recordKeyField);
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));

Review comment:
       We wanted to have the same exp as getKey(). we don't throw exception in constructor if record key is not found. we throw only when getKey(GenericRecord record) is called. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#issuecomment-671892573


   https://github.com/apache/hudi/pull/1834#discussion_r461939866
   : bcoz, this is for Row where as existing WriteStats is for HoodieRecords.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468881817



##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();

Review comment:
       yes, you could do that. I vaguely remember running into some issues and then I went with positions. Don't remember exactly. Might have to code it up to check. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r469400924



##########
File path: hudi-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.STRUCT_TYPE;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getConfigBuilder;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getInternalRowWithError;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.toInternalRows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Unit tests {@link HoodieRowCreateHandle}.
+ */
+public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
+
+  private static final Random RANDOM = new Random();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestHoodieRowCreateHandle");
+    initPath();
+    initFileSystem();
+    initTestDataGenerator();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testRowCreateHandle() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
+    List<String> fileNames = new ArrayList<>();
+    List<String> fileAbsPaths = new ArrayList<>();
+
+    Dataset<Row> totalInputRows = null;
+    // one round per partition
+    for (int i = 0; i < 5; i++) {
+      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3];
+
+      // init some args
+      String fileId = UUID.randomUUID().toString();
+      String instantTime = "000";
+
+      HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE);
+      int size = 10 + RANDOM.nextInt(1000);
+      // Generate inputs
+      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+      if (totalInputRows == null) {
+        totalInputRows = inputRows;
+      } else {
+        totalInputRows = totalInputRows.union(inputRows);
+      }
+
+      // issue writes
+      HoodieInternalWriteStatus writeStatus = writeAndGetWriteStatus(inputRows, handle);
+
+      fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
+      fileNames.add(handle.getFileName());
+      // verify output
+      assertOutput(writeStatus, size, fileId, partitionPath, instantTime, totalInputRows, fileNames, fileAbsPaths);
+    }
+  }
+
+  /**
+   * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch 2 of invalid records Global Error
+   * should be thrown.
+   */
+  @Test
+  public void testGlobalFailure() throws IOException {

Review comment:
       @nsivabalan is there a jira tracking this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468311076



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -108,262 +106,280 @@ private[hudi] object HoodieSparkSqlWriter {
           throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
         }
       }
-      val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-          // register classes & schemas
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
-          val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-          sparkContext.getConf.registerAvroSchemas(schema)
-          log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-          // Convert to RDD[HoodieRecord]
-          val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
-          }).toJavaRDD()
-
-          // Handle various save modes
-          if (mode == SaveMode.ErrorIfExists && exists) {
-            throw new HoodieException(s"hoodie table at $basePath already exists.")
-          }
 
-          if (mode == SaveMode.Overwrite && exists) {
-            log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
-            fs.delete(basePath, true)
-            exists = false
-          }
+      val (writeSuccessfulRetVal: Boolean, commitTimeRetVal: common.util.Option[String], compactionInstantRetVal: common.util.Option[String],
+      writeClientRetVal: HoodieWriteClient[HoodieRecordPayload[Nothing]], tableConfigRetVal: HoodieTableConfig) =
+         if (operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        // register classes & schemas
+        val structName = s"${tblName}_record"
+        val nameSpace = s"hoodie.${tblName}"
 
-          // Create the table if not present
-          if (!exists) {
-            //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
-            val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
-              path.get, HoodieTableType.valueOf(tableType),
-              tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
-            tableConfig = tableMetaClient.getTableConfig
-          }
+        // Handle various save modes
+        if (mode == SaveMode.ErrorIfExists && exists) {
+          throw new HoodieException(s"hoodie table at $basePath already exists.")
+        }
 
-          // Create a HoodieWriteClient & issue the write.
-          val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
-            tblName, mapAsJavaMap(parameters)
-          )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+        val (success, commitTime: common.util.Option[String]) =
+          if (mode == SaveMode.Ignore && exists) {
+            log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+            (false, common.util.Option.ofNullable(instantTime))
+          } else {
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
+              val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val hoodieRecords =
-            if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-              DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
+            val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName,
+              mapAsJavaMap(parameters))
+
+            val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace)
+            hoodieDF.write.format("org.apache.hudi.internal").option(INSTANT_TIME, instantTime)
+              .options(parameters).save()
+            val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+            val syncHiveSucess = if (hiveSyncEnabled) {
+              log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+              val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+              syncHive(basePath, fs, parameters)
             } else {
-              hoodieAllIncomingRecords
+              true
             }
-
-          if (hoodieRecords.isEmpty()) {
-            log.info("new batch has no new records, skipping...")
-            (true, common.util.Option.empty())
+            (syncHiveSucess, common.util.Option.ofNullable(instantTime))
           }
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
-          (writeStatuses, client)
-        } else {
+        (success, commitTime, common.util.Option.of(""), hoodieWriteClient.orNull, tableConfig)
+       } else {
+        val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =

Review comment:
       in general, we should do these rebase/prep PRs upfront so the reviews are easy. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r461939866



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Hoodie's internal write status used in datasource implementation of bulk insert.
+ */
+public class HoodieInternalWriteStatus implements Serializable {

Review comment:
       so, this needs to be a separate class, because?

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
##########
@@ -54,12 +51,17 @@ public String getPartitionPath(GenericRecord record) {
   }
 
   @Override
-  public List<String> getRecordKeyFields() {
-    return recordKeyFields;
+  public List<String> getPartitionPathFields() {
+    return new ArrayList<>();
   }
 
   @Override
-  public List<String> getPartitionPathFields() {
-    return new ArrayList<>();
+  public String getRecordKeyFromRow(Row row) {
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRecordKeyPositions(), true);
+  }
+
+  @Override
+  public String getPartitionPathFromRow(Row row) {

Review comment:
       one issue we need to think about is how we abstract the key generators out, so that even flink etc can use tthis? ideally we need to templatize `GenericRecord`, `Row`. this needs more thought. potentially beyond the scope of this PR 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -129,6 +129,7 @@ object DataSourceWriteOptions {
   val INSERT_OPERATION_OPT_VAL = "insert"
   val UPSERT_OPERATION_OPT_VAL = "upsert"
   val DELETE_OPERATION_OPT_VAL = "delete"
+  val BULK_INSERT_DATASET_OPERATION_OPT_VAL = "bulk_insert_dataset"

Review comment:
       we need not overload the operation type here. we can just introduce a boolean option separately. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org