You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/08/06 14:21:27 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #1834: [HUDI-1013] Adding Bulk Insert V2 implementation

nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r466385455



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with InternalRow for datasource implemention of bulk insert.
+ */
+public class HoodieRowCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(HoodieRowCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
+      String instantTime, int taskPartitionId, long taskId, long taskEpochId,
+      StructType structType) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
+   * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    try {
+      String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
+      String recordKey = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.RECORD_KEY_METADATA_FIELD)).toString();
+      HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, seqId, recordKey, partitionPath, path.getName(),
+          record);
+      try {
+        fileWriter.writeRow(recordKey, internalRow);
+        writeStatus.markSuccess(recordKey);
+      } catch (Throwable t) {
+        writeStatus.markFailure(recordKey, t);
+      }
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;
+    }
+  }
+
+  /**
+   * @returns {@code true} if this handle can take in more writes. else {@code false}.
+   */
+  public boolean canWrite() {
+    return fileWriter.canWrite();
+  }
+
+  /**
+   * Closes the {@link HoodieRowCreateHandle} and returns an instance of {@link HoodieInternalWriteStatus} containing the stats and
+   * status of the writes to this handle.
+   * @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle.
+   * @throws IOException
+   */
+  public HoodieInternalWriteStatus close() throws IOException {
+    fileWriter.close();
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(partitionPath);
+    stat.setNumWrites(writeStatus.getTotalRecords());
+    stat.setNumDeletes(0);
+    stat.setNumInserts(writeStatus.getTotalRecords());
+    stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+    stat.setFileId(fileId);
+    stat.setPath(new Path(writeConfig.getBasePath()), path);
+    long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getFs(), path);
+    stat.setTotalWriteBytes(fileSizeInBytes);
+    stat.setFileSizeInBytes(fileSizeInBytes);
+    stat.setTotalWriteErrors(writeStatus.getFailedRowsSize());
+    HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
+    runtimeStats.setTotalCreateTime(currTimer.endTimer());
+    stat.setRuntimeStats(runtimeStats);
+    writeStatus.setStat(stat);
+    return writeStatus;
+  }
+
+  public String getFileName() {
+    return path.getName();
+  }
+
+  private Path makeNewPath(String partitionPath) {
+    Path path = FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath);
+    try {
+      fs.mkdirs(path); // create a new partition as needed.
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to make dir " + path, e);
+    }
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, getWriteToken(), fileId,
+        tableConfig.getBaseFileFormat().getFileExtension()));
+  }
+
+  /**
+   * Creates an empty marker file corresponding to storage writer path.
+   *
+   * @param partitionPath Partition path
+   */
+  private void createMarkerFile(String partitionPath, String dataFileName) {

Review comment:
       Note to reviewer: these methods are copied from HoodieWriteHandle for now. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with InternalRow for datasource implemention of bulk insert.
+ */
+public class HoodieRowCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(HoodieRowCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
+      String instantTime, int taskPartitionId, long taskId, long taskEpochId,
+      StructType structType) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
+   * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    try {
+      String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(

Review comment:
       Note to reviewer: if this statement fails, we had to consider it as global error and not as per record error since we don't have record key yet. This is different from how HoodieRecord write happens. So, in these cases, rowCreateHandle will throw exception and caller is expected to close the rowCreateHandle.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {

Review comment:
       Note to reviewer: had to move this to hudi-spark as we need to access AvroConversionUtils for Row to GenericRecord converter function.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -129,45 +134,54 @@ public String getPartitionPath(GenericRecord record) {
     if (partitionVal == null) {
       partitionVal = 1L;
     }
+    try {
+      return getPartitionPath(partitionVal);
+    } catch (Exception e) {
+      throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e);
+    }
+  }
 
+  /**
+   * Parse and fetch partition path based on data type.
+   *
+   * @param partitionVal partition path object value fetched from record/row
+   * @return the parsed partition path based on data type
+   * @throws ParseException on any parse exception
+   */
+  private String getPartitionPath(Object partitionVal) throws ParseException {

Review comment:
       Note to reviewer: no changes here. just moved code to a private method for re-use

##########
File path: hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
##########
@@ -116,20 +165,26 @@ public void testScalar() throws IOException {
 
     // timezone is GMT
     properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
-    HoodieKey hk5 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
+    TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    HoodieKey hk5 = keyGen.getKey(baseRecord);
     assertEquals(hk5.getPartitionPath(), "2024-10-04 12");
+
+    // test w/ Row
+    baseRow = genericRecordToRow(baseRecord);
+    keyGen.initializeRowKeyGenerator(structType, testStructName, testNamespace);
+    assertEquals("2024-10-04 12", keyGen.getPartitionPathFromRow(baseRow));
   }
 
   @Test
   public void test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsUTC() throws IOException {
     baseRecord.put("createTime", "2020-04-01T13:01:33.428Z");
     properties = this.getBaseKeyConfig(
-      "DATE_STRING",
-      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
-      "",
-      "",
-      "yyyyMMddHH",
-      "GMT");
+        "DATE_STRING",

Review comment:
       Note to reviewer: I am yet to add tests to these new methods. Got these as part of rebase. Also, I notice few other test classes for each key generators after rebasing. Will add tests by tmrw to those new test classes. 

##########
File path: hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
##########
@@ -34,13 +36,28 @@ import org.scalatest.Assertions.fail
 class TestDataSourceDefaults {
 
   val schema = SchemaTestUtil.getComplexEvolvedSchema
+  val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)

Review comment:
       Note to reviewer: this is the test class where all key generators are tested for Row apis as well. Found new test classes for each key gen after rebasing. Yet to add tests to these new key gen test classes for Row based apis.

##########
File path: hudi-client/src/test/java/org/apache/hudi/testutils/HoodieDatasetTestUtils.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
+
+/**
+ * Dataset test utils.
+ */
+public class HoodieDatasetTestUtils {
+
+  public static final StructType STRUCT_TYPE = new StructType(new StructField[] {

Review comment:
       Note to reviewer: Can't leverage HoodieTestDataGenerator since each record is expected to be in certain format (meta columns followed by data columns). Hence introduced a new schema for testing "bulk insert dataset" 

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Base class for all the built-in key generators. Contains methods structured for
- * code reuse amongst them.
- */
-public abstract class BuiltinKeyGenerator extends KeyGenerator {

Review comment:
       Note to reviewer: moved this file to hudi-spark 

##########
File path: hudi-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.STRUCT_TYPE;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getConfigBuilder;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getInternalRowWithError;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.toInternalRows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Unit tests {@link HoodieRowCreateHandle}.
+ */
+public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
+
+  private static final Random RANDOM = new Random();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestHoodieRowCreateHandle");
+    initPath();
+    initFileSystem();
+    initTestDataGenerator();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testRowCreateHandle() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
+    List<String> fileNames = new ArrayList<>();
+    List<String> fileAbsPaths = new ArrayList<>();
+
+    Dataset<Row> totalInputRows = null;
+    // one round per partition
+    for (int i = 0; i < 5; i++) {
+      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3];
+
+      // init some args
+      String fileId = UUID.randomUUID().toString();
+      String instantTime = "000";
+
+      HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE);
+      int size = 10 + RANDOM.nextInt(1000);
+      // Generate inputs
+      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+      if (totalInputRows == null) {
+        totalInputRows = inputRows;
+      } else {
+        totalInputRows = totalInputRows.union(inputRows);
+      }
+
+      // issue writes
+      HoodieInternalWriteStatus writeStatus = writeAndGetWriteStatus(inputRows, handle);
+
+      fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
+      fileNames.add(handle.getFileName());
+      // verify output
+      assertOutput(writeStatus, size, fileId, partitionPath, instantTime, totalInputRows, fileNames, fileAbsPaths);
+    }
+  }
+
+  /**
+   * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch 2 of invalid records Global Error
+   * should be thrown.
+   */
+  @Test
+  public void testGlobalFailure() throws IOException {

Review comment:
       Note to reviewer: as mentioned above, if there is some error parsing partition path or record key, it will result in global error for the handle and not per record/row error. 
   I couldn't repro/test per record error. I tried writing a different datatype to one of the data column expecting the write to fail, but it didn't fail. So, as of now, there are no tests for per record failures. Same applies to RowFileWriter, InternalWriter etc. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
##########
@@ -51,4 +53,32 @@ protected KeyGenerator(TypedProperties config) {
     throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
         + "Please override this method in your custom key generator.");
   }
+
+  /**
+   * Initializes {@link KeyGenerator} for {@link Row} based operations.
+   * @param structType structype of the dataset.
+   * @param structName struct name of the dataset.
+   * @param recordNamespace record namespace of the dataset.
+   */
+  public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) {

Review comment:
       Note to reviewer: introduced these new apis for Row based KeyGen. All Built in generators have implemented these. If any user has custom key generator, they don't need to implement these apis if not for "bulk_insert_dataset". But if they wish to use "bulk_insert_dataset", they might have to give implementations to these methods.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+/**
+ * Helper class to fetch fields from Row.
+ */
+public class RowKeyGeneratorHelper {
+
+  /**
+   * Generates record key for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param recordKeyFields record key fields as a list
+   * @param recordKeyPositions record key positions for the corresponding record keys in {@code recordKeyFields}
+   * @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise.
+   * @return the record key thus generated
+   */
+  public static String getRecordKeyFromRow(Row row, List<String> recordKeyFields, Map<String, List<Integer>> recordKeyPositions, boolean prefixFieldName) {
+    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
+    String toReturn = IntStream.range(0, recordKeyFields.size()).mapToObj(idx -> {
+      String field = recordKeyFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = recordKeyPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple field
+        Integer fieldPos = fieldPositions.get(0);
+        if (row.isNullAt(fieldPos)) {
+          val = NULL_RECORDKEY_PLACEHOLDER;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = EMPTY_RECORDKEY_PLACEHOLDER;
+          } else {
+            keyIsNullOrEmpty.set(false);
+          }
+        }
+      } else { // nested fields
+        val = getNestedFieldVal(row, recordKeyPositions.get(field)).toString();
+        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          keyIsNullOrEmpty.set(false);
+        }
+      }
+      return prefixFieldName ? (field + ":" + val) : val;
+    }).collect(Collectors.joining(","));
+    if (keyIsNullOrEmpty.get()) {
+      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generates partition path for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param partitionPathFields partition path fields as a list
+   * @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise
+   * @param partitionPathPositions partition path positions for the corresponding fields in {@code partitionPathFields}
+   * @return the generated partition path for the row
+   */
+  public static String getPartitionPathFromRow(Row row, List<String> partitionPathFields, boolean hiveStylePartitioning, Map<String, List<Integer>> partitionPathPositions) {
+    return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
+      String field = partitionPathFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = partitionPathPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple
+        Integer fieldPos = fieldPositions.get(0);
+        // for partition path, if field is not found, index will be set to -1
+        if (fieldPos == -1 || row.isNullAt(fieldPos)) {
+          val = DEFAULT_PARTITION_PATH;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = DEFAULT_PARTITION_PATH;
+          }
+        }
+        if (hiveStylePartitioning) {
+          val = field + "=" + val;
+        }
+      } else { // nested
+        Object nestedVal = getNestedFieldVal(row, partitionPathPositions.get(field));
+        if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          val = hiveStylePartitioning ? field + "=" + DEFAULT_PARTITION_PATH : DEFAULT_PARTITION_PATH;
+        } else {
+          val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : nestedVal.toString();
+        }
+      }
+      return val;
+    }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
+  }
+
+  /**
+   * Fetch the field value located at the positions requested for.
+   * @param row instance of {@link Row} of interest
+   * @param positions tree style positions where the leaf node need to be fetched and returned
+   * @return the field value as per the positions requested for.
+   */
+  public static Object getNestedFieldVal(Row row, List<Integer> positions) {
+    if (positions.size() == 1 && positions.get(0) == -1) {

Review comment:
       Note to reviewer: getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) will return -1 for partitionPathIndices if partition path field is not found.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  private List<String> recordKeyFields;

Review comment:
       Note to reviewer: Have unified code across Simple and Complex key gens. 

##########
File path: hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+/**
+ * Helper class to fetch fields from Row.
+ */
+public class RowKeyGeneratorHelper {
+
+  /**
+   * Generates record key for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param recordKeyFields record key fields as a list
+   * @param recordKeyPositions record key positions for the corresponding record keys in {@code recordKeyFields}
+   * @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise.
+   * @return the record key thus generated
+   */
+  public static String getRecordKeyFromRow(Row row, List<String> recordKeyFields, Map<String, List<Integer>> recordKeyPositions, boolean prefixFieldName) {
+    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
+    String toReturn = IntStream.range(0, recordKeyFields.size()).mapToObj(idx -> {
+      String field = recordKeyFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = recordKeyPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple field
+        Integer fieldPos = fieldPositions.get(0);
+        if (row.isNullAt(fieldPos)) {
+          val = NULL_RECORDKEY_PLACEHOLDER;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = EMPTY_RECORDKEY_PLACEHOLDER;
+          } else {
+            keyIsNullOrEmpty.set(false);
+          }
+        }
+      } else { // nested fields
+        val = getNestedFieldVal(row, recordKeyPositions.get(field)).toString();
+        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          keyIsNullOrEmpty.set(false);
+        }
+      }
+      return prefixFieldName ? (field + ":" + val) : val;
+    }).collect(Collectors.joining(","));
+    if (keyIsNullOrEmpty.get()) {
+      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generates partition path for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param partitionPathFields partition path fields as a list
+   * @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise
+   * @param partitionPathPositions partition path positions for the corresponding fields in {@code partitionPathFields}
+   * @return the generated partition path for the row
+   */
+  public static String getPartitionPathFromRow(Row row, List<String> partitionPathFields, boolean hiveStylePartitioning, Map<String, List<Integer>> partitionPathPositions) {
+    return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
+      String field = partitionPathFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = partitionPathPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple
+        Integer fieldPos = fieldPositions.get(0);
+        // for partition path, if field is not found, index will be set to -1
+        if (fieldPos == -1 || row.isNullAt(fieldPos)) {
+          val = DEFAULT_PARTITION_PATH;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = DEFAULT_PARTITION_PATH;
+          }
+        }
+        if (hiveStylePartitioning) {
+          val = field + "=" + val;
+        }
+      } else { // nested
+        Object nestedVal = getNestedFieldVal(row, partitionPathPositions.get(field));
+        if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          val = hiveStylePartitioning ? field + "=" + DEFAULT_PARTITION_PATH : DEFAULT_PARTITION_PATH;
+        } else {
+          val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : nestedVal.toString();
+        }
+      }
+      return val;
+    }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
+  }
+
+  /**
+   * Fetch the field value located at the positions requested for.
+   * @param row instance of {@link Row} of interest
+   * @param positions tree style positions where the leaf node need to be fetched and returned
+   * @return the field value as per the positions requested for.
+   */
+  public static Object getNestedFieldVal(Row row, List<Integer> positions) {
+    if (positions.size() == 1 && positions.get(0) == -1) {
+      return DEFAULT_PARTITION_PATH;
+    }
+    int index = 0;
+    int totalCount = positions.size();
+    Row valueToProcess = row;
+    Object toReturn = null;
+
+    while (index < totalCount) {
+      if (index < totalCount - 1) {
+        if (valueToProcess.isNullAt(positions.get(index))) {
+          toReturn = NULL_RECORDKEY_PLACEHOLDER;
+          break;
+        }
+        valueToProcess = (Row) valueToProcess.get(positions.get(index));
+      } else { // last index
+        if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
+          toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
+          break;
+        }
+        toReturn = valueToProcess.getAs(positions.get(index));
+      }
+      index++;
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generate the tree style positions for the field requested for as per the defined struct type.
+   * @param structType schema of interest
+   * @param field field of interest for which the positions are requested for
+   * @param isRecordKey {@code true} if the field requested for is a record key. {@code false} incase of a partition path.
+   * @return the positions of the field as per the struct type.
+   */
+  public static List<Integer> getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) {
+    String[] slices = field.split("\\.");
+    List<Integer> positions = new ArrayList<>();
+    int index = 0;
+    int totalCount = slices.length;
+    while (index < totalCount) {
+      String slice = slices[index];
+      Option<Object> curIndexOpt = structType.getFieldIndex(slice);
+      if (curIndexOpt.isDefined()) {
+        int curIndex = (int) curIndexOpt.get();
+        positions.add(curIndex);
+        final StructField nestedField = structType.fields()[curIndex];
+        if (index < totalCount - 1) {
+          if (!(nestedField.dataType() instanceof StructType)) {
+            if (isRecordKey) {
+              throw new HoodieKeyException("Nested field should be of type StructType " + nestedField);
+            } else {
+              positions = Collections.singletonList(-1);

Review comment:
       Note to reviewer: returning -1 only in case  of partition path. So, that  getNestedFieldVal(Row row, List<Integer> positions) will return DEFAULT_PARTITION_PATH if partition path field is not found. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Hoodie's internal write status used in datasource implementation of bulk insert.
+ */
+public class HoodieInternalWriteStatus implements Serializable {

Review comment:
       will address all feedback together. 

##########
File path: hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.STRUCT_TYPE;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getConfigBuilder;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.toInternalRows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Unit tests {@link HoodieDataSourceInternalWriter}.
+ */
+public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness {
+
+  private static final Random RANDOM = new Random();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestHoodieDataSourceInternalWriter");
+    initPath();
+    initFileSystem();
+    initTestDataGenerator();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testDataSourceWriter() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    String instantTime = "001";
+    // init writer
+    HoodieDataSourceInternalWriter dataSourceInternalWriter =
+        new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+    DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());
+
+    List<String> partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
+    List<String> partitionPathsAbs = new ArrayList<>();
+    for (String partitionPath : partitionPaths) {
+      partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
+    }
+
+    int size = 10 + RANDOM.nextInt(1000);
+    int batches = 5;
+    Dataset<Row> totalInputRows = null;
+
+    for (int j = 0; j < batches; j++) {
+      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+      writeRows(inputRows, writer);
+      if (totalInputRows == null) {
+        totalInputRows = inputRows;
+      } else {
+        totalInputRows = totalInputRows.union(inputRows);
+      }
+    }
+
+    HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+    List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+    commitMessages.add(commitMetadata);
+    dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+    metaClient.reloadActiveTimeline();
+    Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
+    // verify output
+    assertOutput(totalInputRows, result, instantTime);
+    assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+  }
+
+  @Test
+  public void testMultipleDataSourceWrites() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    int partitionCounter = 0;
+
+    // execute N rounds
+    for (int i = 0; i < 5; i++) {
+      String instantTime = "00" + i;
+      // init writer
+      HoodieDataSourceInternalWriter dataSourceInternalWriter =
+          new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+
+      List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+      Dataset<Row> totalInputRows = null;
+      DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong());
+
+      int size = 10 + RANDOM.nextInt(1000);
+      int batches = 5; // one batch per partition
+
+      for (int j = 0; j < batches; j++) {
+        String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+        Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+        writeRows(inputRows, writer);
+        if (totalInputRows == null) {
+          totalInputRows = inputRows;
+        } else {
+          totalInputRows = totalInputRows.union(inputRows);
+        }
+      }
+
+      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+      commitMessages.add(commitMetadata);
+      dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+      metaClient.reloadActiveTimeline();
+
+      Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
+
+      // verify output
+      assertOutput(totalInputRows, result, instantTime);
+      assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+    }
+  }
+
+  @Test
+  public void testLargeWrites() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    int partitionCounter = 0;
+
+    // execute N rounds
+    for (int i = 0; i < 3; i++) {
+      String instantTime = "00" + i;
+      // init writer
+      HoodieDataSourceInternalWriter dataSourceInternalWriter =
+          new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+
+      List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+      Dataset<Row> totalInputRows = null;
+      DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong());
+
+      int size = 10000 + RANDOM.nextInt(10000);
+      int batches = 3; // one batch per partition
+
+      for (int j = 0; j < batches; j++) {
+        String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+        Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+        writeRows(inputRows, writer);
+        if (totalInputRows == null) {
+          totalInputRows = inputRows;
+        } else {
+          totalInputRows = totalInputRows.union(inputRows);
+        }
+      }
+
+      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit();
+      commitMessages.add(commitMetadata);
+      dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
+      metaClient.reloadActiveTimeline();
+
+      Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
+
+      // verify output
+      assertOutput(totalInputRows, result, instantTime);
+      assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size);
+    }
+  }
+
+  /**
+   * Tests that DataSourceWriter.abort() will abort the written records of interest write and commit batch1 write and abort batch2 Read of entire dataset should show only records from batch1.
+   * commit batch1
+   * abort batch2
+   * verify only records from batch1 is available to read
+   */
+  @Test
+  public void testAbort() throws IOException {

Review comment:
       Note to reviewer: here is the only place where we test abort for Datasource path. We couldn't test it elsewhere (TestHoodieRowCreateHandle, TestHoodieInternalRowParquetWriter, TestHoodieBulkInsertDataInternalWriter)

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -108,262 +106,280 @@ private[hudi] object HoodieSparkSqlWriter {
           throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
         }
       }
-      val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-          // register classes & schemas
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
-          val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-          sparkContext.getConf.registerAvroSchemas(schema)
-          log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-          // Convert to RDD[HoodieRecord]
-          val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
-          }).toJavaRDD()
-
-          // Handle various save modes
-          if (mode == SaveMode.ErrorIfExists && exists) {
-            throw new HoodieException(s"hoodie table at $basePath already exists.")
-          }
 
-          if (mode == SaveMode.Overwrite && exists) {
-            log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
-            fs.delete(basePath, true)
-            exists = false
-          }
+      val (writeSuccessfulRetVal: Boolean, commitTimeRetVal: common.util.Option[String], compactionInstantRetVal: common.util.Option[String],
+      writeClientRetVal: HoodieWriteClient[HoodieRecordPayload[Nothing]], tableConfigRetVal: HoodieTableConfig) =
+         if (operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        // register classes & schemas
+        val structName = s"${tblName}_record"
+        val nameSpace = s"hoodie.${tblName}"
 
-          // Create the table if not present
-          if (!exists) {
-            //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
-            val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
-              path.get, HoodieTableType.valueOf(tableType),
-              tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
-            tableConfig = tableMetaClient.getTableConfig
-          }
+        // Handle various save modes
+        if (mode == SaveMode.ErrorIfExists && exists) {
+          throw new HoodieException(s"hoodie table at $basePath already exists.")
+        }
 
-          // Create a HoodieWriteClient & issue the write.
-          val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
-            tblName, mapAsJavaMap(parameters)
-          )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+        val (success, commitTime: common.util.Option[String]) =
+          if (mode == SaveMode.Ignore && exists) {
+            log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+            (false, common.util.Option.ofNullable(instantTime))
+          } else {
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
+              val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val hoodieRecords =
-            if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-              DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
+            val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName,
+              mapAsJavaMap(parameters))
+
+            val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace)
+            hoodieDF.write.format("org.apache.hudi.internal").option(INSTANT_TIME, instantTime)
+              .options(parameters).save()
+            val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+            val syncHiveSucess = if (hiveSyncEnabled) {
+              log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+              val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+              syncHive(basePath, fs, parameters)
             } else {
-              hoodieAllIncomingRecords
+              true
             }
-
-          if (hoodieRecords.isEmpty()) {
-            log.info("new batch has no new records, skipping...")
-            (true, common.util.Option.empty())
+            (syncHiveSucess, common.util.Option.ofNullable(instantTime))
           }
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
-          (writeStatuses, client)
-        } else {
+        (success, commitTime, common.util.Option.of(""), hoodieWriteClient.orNull, tableConfig)
+       } else {
+        val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =

Review comment:
       Note to reviewer: no changes in the else section which is same as before for all write operations. Github does not show the difference well. Anyways, if possible just ensure there are no change as I had to manually resolve lot of conflicts during rebase. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org